Contents

[Kafka] Producer

Partitioner

์นดํ”„์นด๋Š” ๋ฉ”์„ธ์ง€๋ฅผ ๋ณ‘๋ ฌ๋กœ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์—ฌ๋Ÿฌ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ฉ”์‹œ์ง€๋Š” ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ณด๋‚ด์ง€๊ณ  ํŒŒํ‹ฐ์…˜ ๋‚ด ๋กœ๊ทธ ์„ธ๊ทธ๋จผํŠธ์— ์ €์žฅ๋˜์–ด ์นดํ”„์นด๋กœ ์ „์†ก๋œ๋‹ค. ์—ฌ๋Ÿฌ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์ด ๊ตฌ์„ฑ๋˜์–ด ์žˆ์„ ๊ฒฝ์šฐ ์–ด๋Š ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ณด๋‚ด์ค˜์•ผ ํ•˜๋Š” ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์—ญํ• ์„ ํ•˜๋Š” ๊ฒƒ์ด ํŒŒํ‹ฐ์…”๋„ˆ์ด๋‹ค.

ํŒŒํ‹ฐ์…”๋„ˆ๋Š” ๋ฉ”์‹œ์ง€์˜ ํ‚ค๋ฅผ ํ•ด์‹œํ•˜์—ฌ ์–ด๋–ค ํ† ํ”ฝ์˜ ์–ด๋–ค ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ณด๋‚ด์ค˜์•ผ ํ•  ์ง€ ๋งคํ•‘ํ•˜๋Š” ํ…Œ์ด๋ธ”์„ ๊ด€๋ฆฌํ•œ๋‹ค. ํŒŒํ‹ฐ์…˜์˜ ๊ฐฏ์ˆ˜๊ฐ€ ๋ณ€๊ฒฝ๋˜๋ฉด, ํ•ด์‹œํ…Œ์ด๋ธ”๋„ ๋ณ€๊ฒฝ๋˜๋ฏ€๋กœ ํŒŒํ‹ฐ์…˜์˜ ๊ฐฏ์ˆ˜๋ฅผ ๋Š˜๋ฆด ๋•Œ๋Š” ์œ ์˜ํ•ด์•ผํ•œ๋‹ค.

๋ฉ”์‹œ์ง€์˜ ํ‚ค๊ฐ’์€ ํ•„์ˆ˜๊ฐ’์ด ์•„๋‹ˆ๋ฏ€๋กœ, null ์ผ ๊ฒฝ์šฐ ์นดํ”„์นด๋Š” ์ž์ฒด์ ์œผ๋กœ ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ ์–ด๋Š ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ณด๋‚ผ ์ง€ ๊ฒฐ์ •ํ•œ๋‹ค. ๋‹ค์Œ์€ ๊ทธ ๋ฐฉ๋ฒ•์ด๋‹ค.

  1. Round Robin

์—ฌ๋Ÿฌ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ, ํŒŒํ‹ฐ์…”๋„ˆ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋žœ๋ค์œผ๋กœ ํŒŒํ‹ฐ์…˜์— ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•œ๋‹ค. ์ „์†ก๋œ ๋ฉ”์‹œ์ง€๋Š” I/O interrupt ๋ฅผ ์ค„์ด๊ธฐ ์œ„ํ•ด, ํŒŒํ‹ฐ์…˜์˜ ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ ์˜์—ญ์—์„œ ๋Œ€๊ธฐ๋ฅผ ํ•˜๋Š”๋ฐ, ๋ผ์šด๋“œ ๋กœ๋นˆ์„ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ, ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ์ฐจ๊ธฐ ์ „๊นŒ์ง€ ๋ฉ”์‹œ์ง€๊ฐ€ ๋Œ€๊ธฐํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ํšจ์œจ์ ์ด์ง€ ๋ชปํ•˜๋‹ค. ( ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ ์˜์—ญ ๋‚ด์—์„œ ์–ผ๋งŒํผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ง€ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋Š” ์„ค์ •๊ฐ’(linger.ms)๊ฐ€ ์žˆ๊ธฐ๋Š” ํ•˜๋‹ค. )

  1. Sticky Partitioning

๋ผ์šด๋“œ๋กœ๋นˆ์˜ ๋น„ํšจ์œจ์ ์ธ ์ „์†ก์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด, ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ ์˜์—ญ ๋‚ด ๊ฐ€์žฅ ๋น ๋ฅด๊ฒŒ ๋ฐฐ์ถœ๋  ์ˆ˜ ์žˆ์„๋งŒํ•œ ํŒŒํ‹ฐ์…˜์„ ์ฐพ์•„ ํŒŒํ‹ฐ์…”๋„ˆ๋Š” ํ•ด๋‹น ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•œ๋‹ค. ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋” ๋นจ๋ฆฌ ์ฑ„์›Œ์งˆ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ผ์šด๋“œ๋กœ๋นˆ๋ณด๋‹ค ํšจ์œจ์ ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ์žฅ์ ์ด ์žˆ๋‹ค.

ํ•˜์ง€๋งŒ, ์ด๋Ÿฌํ•œ ๋ฐฉ์‹์€ ๋ฉ”์‹œ์ง€์˜ ์ „์†ก ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ์ด ๋˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”์‹œ์ง€์˜ ์ˆœ์„œ๊ฐ€ ์ค‘์š”ํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ์ผ ๊ฒฝ์šฐ sticky partitioning ์ „๋žต์„ ์‚ฌ์šฉํ•˜๋ฉด ์ข‹๋‹ค.

Batch

์นดํ”„์นด๋Š” ํ† ํ”ฝ์˜ ์ฒ˜๋ฆฌ๋Ÿ‰์„ ๋†’์ด๊ธฐ ์œ„ํ•ด, ๋ฉ”์‹œ์ง€๋ฅผ ํ•˜๋‚˜ํ•˜๋‚˜ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹Œ ๋ชจ์•˜๋‹ค๊ฐ€ ํ•œ๋ฒˆ์— ์ฒ˜๋ฆฌํ•œ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•œ๋ฒˆ์— ์ฒ˜๋ฆฌํ•˜๋Š” ์ด์œ ๋Š” I/O interrupt ๋ฅผ ์ค„์ด๊ธฐ ์œ„ํ•ด์„œ๋‹ค.

๋ฐฐ์น˜์™€ ๊ด€๋ จ๋œ ์„ค์ •๊ฐ’์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  1. buffer.memory ๊ธฐ๋ณธ๊ฐ’์€ 32MB ์ด๊ณ , buffer.memory > batch.size * #partition ์ด์–ด์•ผ ํ•จ์„ ์ฃผ์˜ํ•˜์ž.
  2. batch.size ํŒŒํ‹ฐ์…˜๋‚ด ๋ฉ”์‹œ์ง€๋“ค์ด ๋ฐฐ์น˜ ์ „์†ก์„ ์œ„ํ•ด ๋ฌถ๋Š” ๋‹จ์œ„์ด๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ 16 KB ์ด๊ณ , 16KB ๊ฐ€ ๋˜๋ฉด์€ ํŒŒํ‹ฐ์…˜๋‚ด ๋ฉ”์‹œ์ง€๋“ค์„ ํ•œ๋ฒˆ์— ์ฒ˜๋ฆฌํ•œ๋‹ค.
  3. linger.ms ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฒ„ํผ ๋ฉ”๋ชจ๋ฆฌ์—์„œ ๋Œ€๊ธฐํ•˜๋Š” ์‹œ๊ฐ„์ด๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ 0ms์ด๋‹ค.

์นดํ”„์นด๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ ์ฃผ์˜ํ•  ์ ์€ ์ฒ˜๋ฆฌ๋Ÿ‰์„ ๋†’์ผ ๊ฒƒ์ธ์ง€ ์ง€์—ฐ์—†๋Š” ์ „์†ก์„ ํ•  ๊ฒƒ์ธ์ง€ ๊ฒฐ์ •ํ•ด์•ผ ํ•œ๋‹ค. ์šด์˜์ค‘์ธ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋”ฐ๋ผ ์œ„ ์„ค์ •๊ฐ’๋“ค์„ ์กฐ์ ˆํ•˜๋ฉฐ ์–ด๋Š๊ฒƒ์ด ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์— ์ตœ์ ํ™”๋œ ๊ฐ’์ธ์ง€๋ฅผ ์ฐพ์•„์•ผ ํ•œ๋‹ค.

์ง€์—ฐ์—†๋Š” ์ „์†ก์„ ์œ„ํ•ด์„ , batch.size / linger.ms ๊ฐ’์„ ์ค„์ด๋ฉด ๋˜๊ณ , ์ฒ˜๋ฆฌ๋Ÿ‰์„ ๋†’์ด๊ธฐ ์œ„ํ•ด์„  ๋Š˜๋ฆฌ๋ฉด ๋œ๋‹ค.

์ฒ˜๋ฆฌ๋Ÿ‰์„ ๋†’์ด๊ธฐ ์œ„ํ•ด์„œ, ์นดํ”„์นด๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์••์ถ•ํฌ๋งท์„ ์ง€์›ํ•˜๊ธฐ๋„ ํ•œ๋‹ค.

์ „์†ก๋ฐฉ์‹

์ ์–ด๋„ ํ•œ๋ฒˆ ์ „์†ก

๋ฉ”์‹œ์ง€๋ฅผ ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ณด๋‚ด๊ณ  ๊ทธ์— ๋Œ€ํ•œ ACK ์ด ์˜ค์ง€ ์•Š๋Š”๋‹ค๋ฉด, ๊ฐ™์€ ๋ฉ”์‹œ์ง€๋ฅผ ํ•œ๋ฒˆ ๋” ๋ณด๋‚ด๋Š” ๋ฐฉ์‹์ด๋‹ค. ์นดํ”„์นด๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์ ์–ด๋„ ํ•œ๋ฒˆ ์ „์†ก์œผ๋กœ ๋™์ž‘ํ•˜๊ณ  ์ด๋Š” ๋ฉ”์‹œ์ง€๊ฐ€ ์ค‘๋ณต ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค.

์ตœ๋Œ€ ํ•œ๋ฒˆ ์ „์†ก

๋ฉ”์‹œ์ง€๋ฅผ ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ณด๋‚ด๊ณ  ACK์„ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ  ๋‹ค์Œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ธ๋‹ค. ๋ฉ”์‹œ์ง€์˜ ์œ ์‹ค์ด ์žˆ์„ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— log ๋‚˜ IoT ๊ฐ™์€ ํ™˜๊ฒฝ์—์„œ ์‚ฌ์šฉ๋œ๋‹ค.

์ค‘๋ณต์—†๋Š” ์ „์†ก

ํ”„๋กœ๋“€์„œ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ ํ—ค๋”์— PID / Sequence Number ๋ฅผ ํฌํ•จํ•˜์—ฌ ๋ธŒ๋กœ์ปค์—๊ฒŒ ์ „๋‹ฌํ•œ๋‹ค. ๋ธŒ๋กœ์ปค๋Š” ์œ„ ๋‘ ๊ฐ’์„ ๋น„๊ตํ•˜์—ฌ ์ฒ˜๋ฆฌ๋œ ๋ฉ”์‹œ์ง€์ธ์ง€ ์•„๋‹Œ์ง€๋ฅผ ํŒ๋‹จํ•˜๊ณ , ๋งŒ์•ฝ ์ฒ˜๋ฆฌ๋œ ๋ฉ”์‹œ์ง€๋ผ๋ฉด ๋ฉ”์‹œ์ง€๋ฅผ ์ €์žฅํ•˜์ง€ ์•Š๊ณ  ACK ๋งŒ ์‘๋‹ตํ•œ๋‹ค.

PID / Sequence Number ๋Š” ๋ธŒ๋กœ์ปค์˜ ๋ฉ”๋ชจ๋ฆฌ ๋ฟ ์•„๋‹ˆ๋ผ, replication log ์—๋„ ์ž‘์„ฑ๋˜์–ด ๋ธŒ๋กœ์ปค๊ฐ€ ๋‹ค์šด๋˜์–ด follower ๊ฐ€ promotion ๋  ๋•Œ๋„ ์ค‘๋ณต ์—†๋Š” ๋ฉ”์‹œ์ง€ ์ „์†ก์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

์—ฌ๊ธฐ์„œ ์ฃผ์˜ํ•ด์•ผ ํ•  ์ ์€ ์ค‘๋ณต์—†๋Š” ์ „์†ก์€ ์ •ํ™•ํžˆ ํ•œ๋ฒˆ ์ „์†ก๊ณผ ๊ตฌ๋ณ„๋œ๋‹ค๋Š” ์ ์ด๋‹ค.

์ค‘๋ณต์—†๋Š” ์ „์†ก์„ ํ•˜๊ธฐ ์œ„ํ•ด์„  ํ”„๋กœ๋“€์„œ์— ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์„ค์ •๊ฐ’๋“ค์ด ํ•„์š”ํ•˜๋‹ค.

  • enable.idempotence = true ๊ธฐ๋ณธ๊ฐ’์€ false ์ด๋ฉฐ, ์ด ๊ฐ’์ด true ๋กœ ์„ค์ •๋˜๋ฉด, ๋‹ค์Œ 3๊ฐ€์ง€์˜ ์˜ต์…˜์„ ์ˆ˜์ •ํ•ด์•ผ ํ•œ๋‹ค.
  • max.in.flight.requests.per.connection = 1~5 ๋ธŒ๋กœ์ปค๋กœ๋ถ€ํ„ฐ ack ์ด ์˜ค์ง€ ์•Š์•˜์„ ๋•Œ connection ์—์„œ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๋Š” ์ตœ๋Œ€ ์š”์ฒญ ์ˆ˜
  • acks = all (-1) ํ”„๋กœ๋“€์„œ๋Š” ๋ชจ๋“  ๋ฆฌ๋” ๋ธŒ๋กœ์ปค์™€ ISR ์— ํฌํ•จ๋œ ๋ธŒ๋กœ์ปค๋“ค์ด ๋ชจ๋‘ ๋ฉ”์‹œ์ง€๋ฅผ ๊ธฐ๋กํ–ˆ๋‹ค๋Š” ์š”์ฒญ์„ ๊ธฐ๋‹ค๋ฆฐ๋‹ค. ์ด๋ ‡๊ฒŒ ๋จ์œผ๋กœ์จ ๋ฉ”์‹œ์ง€๋ฅผ ์ค‘๋ณต์—†์ด ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.
  • retries > 0 ack ์„ ๋ฐ›์ง€ ๋ชปํ•œ ๊ฒฝ์šฐ ๋ช‡ ๋ฒˆ ์žฌ์‹œ๋„๋ฅผ ํ•ด์ค˜์•ผ ํ•˜๋Š”์ง€ ๊ฒฐ์ •ํ•ด์ฃผ๋Š” ๊ฐ’์ด๋‹ค.

์œ„์™€ ๊ฐ™์ด ProducerId / lastSequence / firstSequence ๊ฐ€ ๊ธฐ๋ก๋œ ๋ชจ์Šต์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

์ •ํ™•ํžˆ ํ•œ๋ฒˆ ์ „์†ก

์นดํ”„์นด๋Š” ์ •ํ™•ํžˆ ํ•œ๋ฒˆ ์ „์†ก์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด, ๋ณ„๋„์˜ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์กด์žฌํ•˜๋Š”๋ฐ ์ด๋ฅผ ํŠธ๋žœ์žญ์…˜ API๋ผ๊ณ  ํ•œ๋‹ค.

์นดํ”„์นด์—์„œ ํŠธ๋žœ์žญ์…˜์„ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๋ณ„๋„์˜ ๋ธŒ๋กœ์ปค์—๋Š” transaction coordinator ๊ฐ€ ์กด์žฌํ•œ๋‹ค.

์นดํ”„์นด ํŠธ๋žœ์žญ์…˜์„ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„ , ์ค‘๋ณต์—†๋Š” ์ „์†ก์—์„œ ์„ค์ •ํ–ˆ๋˜ ๊ฐ’ ์ด์™ธ์— transactional_id_config ๊ฐ’์ด ์ถ”๊ฐ€๋˜์–ด์•ผ ํ•œ๋‹ค. ์ด ๋•Œ ํ”„๋กœ๋“€์„œ๋งˆ๋‹ค ์„œ๋กœ ๋‹ค๋ฅธ ๊ฐ’์œผ๋กœ ํ•ด๋‹น ์„ค์ •๊ฐ’์„ ๊ตฌ์„ฑํ•ด์•ผ ํ•œ๋‹ค.

  1. ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ ์ฐพ๊ธฐ ํ”„๋กœ๋“€์„œ๋Š” ๋ธŒ๋กœ์ปค์—๊ฒŒ FindCoordinatorRequest ์š”์ฒญ์„ ๋ณด๋‚ด ์ฝ”๋””๋„ค์ดํ„ฐ๋ฅผ ์ฐพ๊ฒŒ ๋œ๋‹ค. ์ด ๊ณผ์ •์—์„œ ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ๋Š” PID(producer id) ์™€ TID(transaction id) ๋ฅผ ๋งคํ•‘ํ•˜๊ณ  ํŠธ๋žœ์žญ์…˜์„ ๊ด€๋ฆฌํ•œ๋‹ค.

  2. initTransaction() ํ”„๋กœ๋“€์„œ๋Š” ์œ„ ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์ดˆ๊ธฐํ™”๋ฅผ ํ•˜๋ฉฐ, ์ด ๋•Œ InitPidRequest ์š”์ฒญ์„ ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ๋กœ ๋ณด๋‚ธ๋‹ค. ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ๋Š” ์ด ๊ณผ์ •์—์„œ PID / TID ๋ฅผ ๋งคํ•‘ํ•˜๊ณ  PID Epoch๋ฅผ ํ•œ ๋‹จ๊ณ„ ์˜ฌ๋ฆฐ๋‹ค. (epoch๋ฅผ ์ฆ๊ฐ€์‹œํ‚ค๋Š” ์ด์œ ๋Š”, ๋ฉ”์‹œ์ง€ ์•ˆ์ •์„ฑ์„ ๋†’์ด๊ธฐ ์œ„ํ•จ์ด๋‹ค.)

    1
    
    peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1712735064600)
    

    ํŠธ๋žœ์žญ์…˜ ์ดˆ๊ธฐํ™”์— ํ•ด๋‹นํ•˜๋Š” ๋กœ๊ทธ์ด๋‹ค. PID ๋Š” 3000 / state=Empty / topicPartition์€ empty set ์ด๋‹ค.

  3. beginTransaction() ์ด ๊ณผ์ •์—์„œ ํ”„๋กœ๋“€์„œ๋Š” ํ† ํ”ฝ ํŒŒํ‹ฐ์…˜ ์ •๋ณด๋ฅผ ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ์— ์ „๋‹ฌํ•˜๊ณ  ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ๋Š” ํŠธ๋žœ์žญ์…˜ ๋กœ๊ทธ์— TID, ํŒŒํ‹ฐ์…˜ ์ •๋ณด๋ฅผ ๊ธฐ๋กํ•˜๊ณ  ํŠธ๋žœ์žญ์…˜ ์ƒํƒœ๋ฅผ ongoing์œผ๋กœ ํ‘œ์‹œํ•œ๋‹ค. ์ถ”๊ฐ€๋กœ ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ๋Š” ํ•ด๋‹น ํŠธ๋žœ์žญ์…˜์— ๋Œ€ํ•ด ํƒ€์ด๋จธ๋ฅผ ์„ค์ •ํ•˜๊ณ  1๋ถ„์„ ๋„˜๊ธธ ๊ฒฝ์šฐ ํ•ด๋‹น ํŠธ๋žœ์žญ์…˜์„ ์‹คํŒจ๋กœ ๊ฐ„์ฃผํ•œ๋‹ค.

    1
    
    peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1712735065999, txnLastUpdateTimestamp=1712735065999)
    

    state=ongoing์œผ๋กœ ๋ณ€๊ฒฝ๋˜์—ˆ๊ณ , topicPartitions=Set(peter-test05-0) ์—์„œ ํŠธ๋žœ์žญ์…˜์ด ์‹œ์ž‘ํ–ˆ์Œ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

  4. ๋ฉ”์‹œ์ง€ ์ „์†ก

    ํ”„๋กœ๋“€์„œ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•˜๋ฉด์„œ, PID / epoch / sequence number ๋ฅผ ๋ฉ”์‹œ์ง€์— ๋‹ด์•„ ์ „์†กํ•œ๋‹ค.

    1
    2
    3
    4
    
    | offset: 0 CreateTime: 1712735065962 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 120 CreateTime: 1712735066161 size: 78 magic: 2 compresscodec: NONE crc: 1514605644 isvalid: true
    
    | offset: 1 CreateTime: 1712735066161 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
    

    lastSequence: 0 producerId: 3000 producerEpoch: 0 ๊ฐ’์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€์— PID / epoch / sequence number ๊ฐ€ ๋‹ด๊ฒจ ์ „์†ก๋œ๋‹ค๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

    offset 1๋ฒˆ์— ๋‹ด๊ธด ๋กœ๊ทธ๋ฅผ ๋ณด๋ฉด ๊ธฐ์กด ๋กœ๊ทธ์™€ ํ˜•ํƒœ๊ฐ€ ์กฐ๊ธˆ ๋‹ค๋ฅธ ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋Š”๋ฐ, ์ด ๋ฉ”์‹œ์ง€๋Š” control message ์ด๋‹ค. ์ปจ์Šˆ๋จธ์— read_committed option์ด ํ™œ์„ฑํ™” ๋˜์–ด ์žˆ๋‹ค๋ฉด ์ด ๋ฉ”์‹œ์ง€ ์•ž์— ์žˆ๋Š” ๋ฉ”์‹œ์ง€๋“ค๋งŒ ์ฝ์„ ์ˆ˜ ์žˆ๋‹ค.

  5. ๋ฉ”์‹œ์ง€ ์ „์†ก ์™„๋ฃŒ

๋ฉ”์‹œ์ง€๋ฅผ ์ „์†ก์™„๋ฃŒํ•˜๊ฒŒ ๋˜๋ฉด, ํ”„๋กœ๋“€์„œ๋Š” commitTransaction() ๋˜๋Š” abortTransaction() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด์•ผ๋งŒ ํ•œ๋‹ค. ์ปค๋ฐ‹์ด ๋˜์—ˆ๋‹ค๋ฉด ํŠธ๋žœ์žญ์…˜ ์ฝ”๋””๋„ค์ดํ„ฐ๋Š” ํ•ด๋‹น ํŠธ๋žœ์žญ์…˜์— ๋Œ€ํ•œ ๋กœ๊ทธ์ธ prepareCommit ๋˜๋Š” prepareAbort๋ฅผ ๋กœ๊ทธ์— ๊ธฐ๋กํ•œ๋‹ค.