[Kafka] Producer
Partitioner
์นดํ์นด๋ ๋ฉ์ธ์ง๋ฅผ ๋ณ๋ ฌ๋ก ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ผ๋ก ๊ตฌ์ฑํ ์ ์๋ค. ๋ฉ์์ง๋ ํํฐ์ ์ผ๋ก ๋ณด๋ด์ง๊ณ ํํฐ์ ๋ด ๋ก๊ทธ ์ธ๊ทธ๋จผํธ์ ์ ์ฅ๋์ด ์นดํ์นด๋ก ์ ์ก๋๋ค. ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ด ๊ตฌ์ฑ๋์ด ์์ ๊ฒฝ์ฐ ์ด๋ ํํฐ์ ์ผ๋ก ๋ณด๋ด์ค์ผ ํ๋ ์ง ๊ฒฐ์ ํ๋ ์ญํ ์ ํ๋ ๊ฒ์ด ํํฐ์ ๋์ด๋ค.
ํํฐ์ ๋๋ ๋ฉ์์ง์ ํค๋ฅผ ํด์ํ์ฌ ์ด๋ค ํ ํฝ์ ์ด๋ค ํํฐ์ ์ผ๋ก ๋ณด๋ด์ค์ผ ํ ์ง ๋งคํํ๋ ํ ์ด๋ธ์ ๊ด๋ฆฌํ๋ค. ํํฐ์ ์ ๊ฐฏ์๊ฐ ๋ณ๊ฒฝ๋๋ฉด, ํด์ํ ์ด๋ธ๋ ๋ณ๊ฒฝ๋๋ฏ๋ก ํํฐ์ ์ ๊ฐฏ์๋ฅผ ๋๋ฆด ๋๋ ์ ์ํด์ผํ๋ค.
๋ฉ์์ง์ ํค๊ฐ์ ํ์๊ฐ์ด ์๋๋ฏ๋ก, null ์ผ ๊ฒฝ์ฐ ์นดํ์นด๋ ์์ฒด์ ์ผ๋ก ํด๋น ๋ฉ์์ง๋ฅผ ์ด๋ ํํฐ์ ์ผ๋ก ๋ณด๋ผ ์ง ๊ฒฐ์ ํ๋ค. ๋ค์์ ๊ทธ ๋ฐฉ๋ฒ์ด๋ค.
- Round Robin
์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ผ๋ก ๊ตฌ์ฑ๋์ด ์๋ ๊ฒฝ์ฐ, ํํฐ์ ๋๋ ๋ฉ์์ง๋ฅผ ๋๋ค์ผ๋ก ํํฐ์ ์ ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค. ์ ์ก๋ ๋ฉ์์ง๋ I/O interrupt ๋ฅผ ์ค์ด๊ธฐ ์ํด, ํํฐ์ ์ ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ ์์ญ์์ ๋๊ธฐ๋ฅผ ํ๋๋ฐ, ๋ผ์ด๋ ๋ก๋น์ ์ฌ์ฉํ ๊ฒฝ์ฐ, ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ๊ฐ ์ฐจ๊ธฐ ์ ๊น์ง ๋ฉ์์ง๊ฐ ๋๊ธฐํ ์ ์๊ธฐ ๋๋ฌธ์ ํจ์จ์ ์ด์ง ๋ชปํ๋ค. ( ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ ์์ญ ๋ด์์ ์ผ๋งํผ ๊ธฐ๋ค๋ฆฌ๋ ์ง ๊ฒฐ์ ํ ์ ์๋ ์ค์ ๊ฐ(linger.ms)๊ฐ ์๊ธฐ๋ ํ๋ค. )
- Sticky Partitioning
๋ผ์ด๋๋ก๋น์ ๋นํจ์จ์ ์ธ ์ ์ก์ ๋ฐฉ์งํ๊ธฐ ์ํด, ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ ์์ญ ๋ด ๊ฐ์ฅ ๋น ๋ฅด๊ฒ ๋ฐฐ์ถ๋ ์ ์์๋งํ ํํฐ์ ์ ์ฐพ์ ํํฐ์ ๋๋ ํด๋น ํํฐ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค. ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ ๋นจ๋ฆฌ ์ฑ์์ง ์ ์๊ธฐ ๋๋ฌธ์ ๋ผ์ด๋๋ก๋น๋ณด๋ค ํจ์จ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ์ฅ์ ์ด ์๋ค.
ํ์ง๋ง, ์ด๋ฌํ ๋ฐฉ์์ ๋ฉ์์ง์ ์ ์ก ์์๊ฐ ๋ณด์ฅ์ด ๋์ง ์๊ธฐ ๋๋ฌธ์ ๋ฉ์์ง์ ์์๊ฐ ์ค์ํ์ง ์์ ๊ฒฝ์ฐ์ผ ๊ฒฝ์ฐ sticky partitioning ์ ๋ต์ ์ฌ์ฉํ๋ฉด ์ข๋ค.
Batch
์นดํ์นด๋ ํ ํฝ์ ์ฒ๋ฆฌ๋์ ๋์ด๊ธฐ ์ํด, ๋ฉ์์ง๋ฅผ ํ๋ํ๋ ์ฒ๋ฆฌํ๋ ๊ฒ์ด ์๋ ๋ชจ์๋ค๊ฐ ํ๋ฒ์ ์ฒ๋ฆฌํ๋ค. ์ด๋ ๊ฒ ํ๋ฒ์ ์ฒ๋ฆฌํ๋ ์ด์ ๋ I/O interrupt ๋ฅผ ์ค์ด๊ธฐ ์ํด์๋ค.
๋ฐฐ์น์ ๊ด๋ จ๋ ์ค์ ๊ฐ์ ๋ค์๊ณผ ๊ฐ๋ค.
- buffer.memory ๊ธฐ๋ณธ๊ฐ์ 32MB ์ด๊ณ , buffer.memory > batch.size * #partition ์ด์ด์ผ ํจ์ ์ฃผ์ํ์.
- batch.size ํํฐ์ ๋ด ๋ฉ์์ง๋ค์ด ๋ฐฐ์น ์ ์ก์ ์ํด ๋ฌถ๋ ๋จ์์ด๋ค. ๊ธฐ๋ณธ๊ฐ์ 16 KB ์ด๊ณ , 16KB ๊ฐ ๋๋ฉด์ ํํฐ์ ๋ด ๋ฉ์์ง๋ค์ ํ๋ฒ์ ์ฒ๋ฆฌํ๋ค.
- linger.ms ๋ฉ์์ง๊ฐ ๋ฒํผ ๋ฉ๋ชจ๋ฆฌ์์ ๋๊ธฐํ๋ ์๊ฐ์ด๋ค. ๊ธฐ๋ณธ๊ฐ์ 0ms์ด๋ค.
์นดํ์นด๋ฅผ ์ฌ์ฉํ ๋ ์ฃผ์ํ ์ ์ ์ฒ๋ฆฌ๋์ ๋์ผ ๊ฒ์ธ์ง ์ง์ฐ์๋ ์ ์ก์ ํ ๊ฒ์ธ์ง ๊ฒฐ์ ํด์ผ ํ๋ค. ์ด์์ค์ธ ์ดํ๋ฆฌ์ผ์ด์ ์ ๋ฐ๋ผ ์ ์ค์ ๊ฐ๋ค์ ์กฐ์ ํ๋ฉฐ ์ด๋๊ฒ์ด ์ดํ๋ฆฌ์ผ์ด์ ์ ์ต์ ํ๋ ๊ฐ์ธ์ง๋ฅผ ์ฐพ์์ผ ํ๋ค.
์ง์ฐ์๋ ์ ์ก์ ์ํด์ , batch.size / linger.ms ๊ฐ์ ์ค์ด๋ฉด ๋๊ณ , ์ฒ๋ฆฌ๋์ ๋์ด๊ธฐ ์ํด์ ๋๋ฆฌ๋ฉด ๋๋ค.
์ฒ๋ฆฌ๋์ ๋์ด๊ธฐ ์ํด์, ์นดํ์นด๋ ์ฌ๋ฌ๊ฐ์ง ์์ถํฌ๋งท์ ์ง์ํ๊ธฐ๋ ํ๋ค.
์ ์ก๋ฐฉ์
์ ์ด๋ ํ๋ฒ ์ ์ก
๋ฉ์์ง๋ฅผ ํ๋ก๋์๊ฐ ๋ณด๋ด๊ณ ๊ทธ์ ๋ํ ACK ์ด ์ค์ง ์๋๋ค๋ฉด, ๊ฐ์ ๋ฉ์์ง๋ฅผ ํ๋ฒ ๋ ๋ณด๋ด๋ ๋ฐฉ์์ด๋ค. ์นดํ์นด๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ ์ด๋ ํ๋ฒ ์ ์ก์ผ๋ก ๋์ํ๊ณ ์ด๋ ๋ฉ์์ง๊ฐ ์ค๋ณต ๊ฐ๋ฅ์ฑ์ด ์๋ค๋ ๊ฒ์ ์๋ฏธํ๋ค.
์ต๋ ํ๋ฒ ์ ์ก
๋ฉ์์ง๋ฅผ ํ๋ก๋์๊ฐ ๋ณด๋ด๊ณ ACK์ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ๋ค์ ๋ฉ์์ง๋ฅผ ๋ณด๋ธ๋ค. ๋ฉ์์ง์ ์ ์ค์ด ์์ ์ ์๊ธฐ ๋๋ฌธ์ log ๋ IoT ๊ฐ์ ํ๊ฒฝ์์ ์ฌ์ฉ๋๋ค.
์ค๋ณต์๋ ์ ์ก
ํ๋ก๋์๋ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ํค๋์ PID / Sequence Number ๋ฅผ ํฌํจํ์ฌ ๋ธ๋ก์ปค์๊ฒ ์ ๋ฌํ๋ค. ๋ธ๋ก์ปค๋ ์ ๋ ๊ฐ์ ๋น๊ตํ์ฌ ์ฒ๋ฆฌ๋ ๋ฉ์์ง์ธ์ง ์๋์ง๋ฅผ ํ๋จํ๊ณ , ๋ง์ฝ ์ฒ๋ฆฌ๋ ๋ฉ์์ง๋ผ๋ฉด ๋ฉ์์ง๋ฅผ ์ ์ฅํ์ง ์๊ณ ACK ๋ง ์๋ตํ๋ค.
PID / Sequence Number ๋ ๋ธ๋ก์ปค์ ๋ฉ๋ชจ๋ฆฌ ๋ฟ ์๋๋ผ, replication log ์๋ ์์ฑ๋์ด ๋ธ๋ก์ปค๊ฐ ๋ค์ด๋์ด follower ๊ฐ promotion ๋ ๋๋ ์ค๋ณต ์๋ ๋ฉ์์ง ์ ์ก์ด ๊ฐ๋ฅํ๋ค.
์ฌ๊ธฐ์ ์ฃผ์ํด์ผ ํ ์ ์ ์ค๋ณต์๋ ์ ์ก์ ์ ํํ ํ๋ฒ ์ ์ก๊ณผ ๊ตฌ๋ณ๋๋ค๋ ์ ์ด๋ค.
์ค๋ณต์๋ ์ ์ก์ ํ๊ธฐ ์ํด์ ํ๋ก๋์์ ๋ค์๊ณผ ๊ฐ์ ์ค์ ๊ฐ๋ค์ด ํ์ํ๋ค.
- enable.idempotence = true ๊ธฐ๋ณธ๊ฐ์ false ์ด๋ฉฐ, ์ด ๊ฐ์ด true ๋ก ์ค์ ๋๋ฉด, ๋ค์ 3๊ฐ์ง์ ์ต์ ์ ์์ ํด์ผ ํ๋ค.
- max.in.flight.requests.per.connection = 1~5 ๋ธ๋ก์ปค๋ก๋ถํฐ ack ์ด ์ค์ง ์์์ ๋ connection ์์ ๋ณด๋ผ ์ ์๋ ์ต๋ ์์ฒญ ์
- acks = all (-1) ํ๋ก๋์๋ ๋ชจ๋ ๋ฆฌ๋ ๋ธ๋ก์ปค์ ISR ์ ํฌํจ๋ ๋ธ๋ก์ปค๋ค์ด ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๊ธฐ๋กํ๋ค๋ ์์ฒญ์ ๊ธฐ๋ค๋ฆฐ๋ค. ์ด๋ ๊ฒ ๋จ์ผ๋ก์จ ๋ฉ์์ง๋ฅผ ์ค๋ณต์์ด ์ฒ๋ฆฌํ ์ ์๊ฒ ๋๋ค.
- retries > 0 ack ์ ๋ฐ์ง ๋ชปํ ๊ฒฝ์ฐ ๋ช ๋ฒ ์ฌ์๋๋ฅผ ํด์ค์ผ ํ๋์ง ๊ฒฐ์ ํด์ฃผ๋ ๊ฐ์ด๋ค.
์์ ๊ฐ์ด ProducerId
/ lastSequence
/ firstSequence
๊ฐ ๊ธฐ๋ก๋ ๋ชจ์ต์ ๋ณผ ์ ์๋ค.
์ ํํ ํ๋ฒ ์ ์ก
์นดํ์นด๋ ์ ํํ ํ๋ฒ ์ ์ก์ ์ํํ๊ธฐ ์ํด, ๋ณ๋์ ํ๋ก์ธ์ค๊ฐ ์กด์ฌํ๋๋ฐ ์ด๋ฅผ ํธ๋์ญ์ API๋ผ๊ณ ํ๋ค.
์นดํ์นด์์ ํธ๋์ญ์
์ ๊ด๋ฆฌํ๊ธฐ ์ํด ๋ณ๋์ ๋ธ๋ก์ปค์๋ transaction coordinator
๊ฐ ์กด์ฌํ๋ค.
์นดํ์นด ํธ๋์ญ์
์ ์ฌ์ฉํ๊ธฐ ์ํด์ , ์ค๋ณต์๋ ์ ์ก์์ ์ค์ ํ๋ ๊ฐ ์ด์ธ์ transactional_id_config
๊ฐ์ด ์ถ๊ฐ๋์ด์ผ ํ๋ค. ์ด ๋ ํ๋ก๋์๋ง๋ค ์๋ก ๋ค๋ฅธ ๊ฐ์ผ๋ก ํด๋น ์ค์ ๊ฐ์ ๊ตฌ์ฑํด์ผ ํ๋ค.
ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ ์ฐพ๊ธฐ ํ๋ก๋์๋ ๋ธ๋ก์ปค์๊ฒ
FindCoordinatorRequest
์์ฒญ์ ๋ณด๋ด ์ฝ๋๋ค์ดํฐ๋ฅผ ์ฐพ๊ฒ ๋๋ค. ์ด ๊ณผ์ ์์ ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ๋ PID(producer id) ์ TID(transaction id) ๋ฅผ ๋งคํํ๊ณ ํธ๋์ญ์ ์ ๊ด๋ฆฌํ๋ค.initTransaction() ํ๋ก๋์๋ ์ ๋ฉ์๋๋ฅผ ํตํด ์ด๊ธฐํ๋ฅผ ํ๋ฉฐ, ์ด ๋ InitPidRequest ์์ฒญ์ ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ๋ก ๋ณด๋ธ๋ค. ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ๋ ์ด ๊ณผ์ ์์ PID / TID ๋ฅผ ๋งคํํ๊ณ PID Epoch๋ฅผ ํ ๋จ๊ณ ์ฌ๋ฆฐ๋ค. (epoch๋ฅผ ์ฆ๊ฐ์ํค๋ ์ด์ ๋, ๋ฉ์์ง ์์ ์ฑ์ ๋์ด๊ธฐ ์ํจ์ด๋ค.)
1
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1712735064600)
ํธ๋์ญ์ ์ด๊ธฐํ์ ํด๋นํ๋ ๋ก๊ทธ์ด๋ค. PID ๋ 3000 / state=Empty / topicPartition์ empty set ์ด๋ค.
beginTransaction() ์ด ๊ณผ์ ์์ ํ๋ก๋์๋ ํ ํฝ ํํฐ์ ์ ๋ณด๋ฅผ ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ์ ์ ๋ฌํ๊ณ ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ๋ ํธ๋์ญ์ ๋ก๊ทธ์ TID, ํํฐ์ ์ ๋ณด๋ฅผ ๊ธฐ๋กํ๊ณ ํธ๋์ญ์ ์ํ๋ฅผ
ongoing
์ผ๋ก ํ์ํ๋ค. ์ถ๊ฐ๋ก ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ๋ ํด๋น ํธ๋์ญ์ ์ ๋ํด ํ์ด๋จธ๋ฅผ ์ค์ ํ๊ณ 1๋ถ์ ๋๊ธธ ๊ฒฝ์ฐ ํด๋น ํธ๋์ญ์ ์ ์คํจ๋ก ๊ฐ์ฃผํ๋ค.1
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1712735065999, txnLastUpdateTimestamp=1712735065999)
state=ongoing์ผ๋ก ๋ณ๊ฒฝ๋์๊ณ , topicPartitions=Set(peter-test05-0) ์์ ํธ๋์ญ์ ์ด ์์ํ์์ ์ ์ ์๋ค.
๋ฉ์์ง ์ ์ก
ํ๋ก๋์๋ ๋ฉ์์ง๋ฅผ ์ ์กํ๋ฉด์, PID / epoch / sequence number ๋ฅผ ๋ฉ์์ง์ ๋ด์ ์ ์กํ๋ค.
1 2 3 4
| offset: 0 CreateTime: 1712735065962 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0 baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 120 CreateTime: 1712735066161 size: 78 magic: 2 compresscodec: NONE crc: 1514605644 isvalid: true | offset: 1 CreateTime: 1712735066161 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
lastSequence: 0 producerId: 3000 producerEpoch: 0 ๊ฐ์ ํตํด ๋ฉ์์ง์ PID / epoch / sequence number ๊ฐ ๋ด๊ฒจ ์ ์ก๋๋ค๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
offset 1๋ฒ์ ๋ด๊ธด ๋ก๊ทธ๋ฅผ ๋ณด๋ฉด ๊ธฐ์กด ๋ก๊ทธ์ ํํ๊ฐ ์กฐ๊ธ ๋ค๋ฅธ ๊ฑธ ๋ณผ ์ ์๋๋ฐ, ์ด ๋ฉ์์ง๋
control message
์ด๋ค. ์ปจ์๋จธ์ read_committed option์ด ํ์ฑํ ๋์ด ์๋ค๋ฉด ์ด ๋ฉ์์ง ์์ ์๋ ๋ฉ์์ง๋ค๋ง ์ฝ์ ์ ์๋ค.๋ฉ์์ง ์ ์ก ์๋ฃ
๋ฉ์์ง๋ฅผ ์ ์ก์๋ฃํ๊ฒ ๋๋ฉด, ํ๋ก๋์๋ commitTransaction() ๋๋ abortTransaction() ๋ฉ์๋๋ฅผ ํธ์ถํด์ผ๋ง ํ๋ค. ์ปค๋ฐ์ด ๋์๋ค๋ฉด ํธ๋์ญ์ ์ฝ๋๋ค์ดํฐ๋ ํด๋น ํธ๋์ญ์ ์ ๋ํ ๋ก๊ทธ์ธ prepareCommit ๋๋ prepareAbort๋ฅผ ๋ก๊ทธ์ ๊ธฐ๋กํ๋ค.