kafkaPoint
Kafka ์ฌ์ฉ ์ ์ฃผ์์
Reference
๊ฐ์ : [ํจ์คํธ์บ ํผ์ค] The RED : ๋น์ฆ๋์ค ์ฑ๊ณต์ ์ํ Java/Spring ๊ธฐ๋ฐ ์๋น์ค ๊ฐ๋ฐ๊ณผ MSA ๊ตฌ์ถ by ์ดํฌ์ฐฝ
Rebalancing
kafka์ ํต์ ํ๋ consumer group์ค ํน์ ์ธ์คํด์ค๊ฐ ๋ค์ด๋์์ ๋, consumer group ์ consumer๊ฐ ์ถ๊ฐ ๋๋ ์ญ์ ๋๋ฉด consumer๊ฐ ์์ ํ๊ณ ์๋ partition ์ ์ฌํ ๋น (reassign) ํ๋ ์์
์ด ๋ฐ์ํ๋๋ฐ ์ด๋ฅผ rebalancing ์ด๋ผ๊ณ ํ๋ค
์ ์ํฉ์์ Instance3๊ฐ ๋ค์ด๋์์ ๋ ์ด๋ฅผ ๋น ๋ฅด๊ฒ rebalancingํด์ผ ํ๋ค.
kafka 1.x ์์๋ rebalancing ์ด ๋ฐ์ํ๋ฉด ์ ์ฒด consumer ๊ฐ ๋ฉ์์ง์ ๋ํ polling ์ ์๊ฐ ์ค๋จํ๋ค(Stop the World). kafka ์ consumer ๋ก ์ฐธ์ฌํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฐฐํฌ๊ฐ ์์ ๋๋ง๋ค kafka ์ ๋ชจ๋ consumer ๊ฐ ์ผ์์ ์ผ๋ก ๋์ํ์ง ์๋ ์ํฉ์ด ๋ฐ์ํ ์ ์๋ค.
๋คํํ, kafka 2.3 ๋ฒ์ ์ดํ๋ก๋ Incremental Cooperative Rebalancing ๋ผ๋ ๋์์ธ์ด ์ ์ฉ๋์ด ์ด๋ฐ ์ด์๊ฐ ํด๊ฒฐ๋ ์ํ์ด๋ค. (Stop the Worldํ์ง ์๊ณ ๋์ํ๋ ์ธ์คํด์ค๋ง ์ฐพ์ ์ฌํ ๋น)
Design and Implementation of Incremental Cooperative Rebalancing ์ฐธ๊ณ
์์ ๋ณด์ฅ
topic์ ์์ ๋ณด์ฅ์ ํด์ฃผ์ง ์์ง๋ง, topic๋ด์ partition์์๋ ์์ ๋ณด์ฅ ๊ธฐ๋ฅ์ ์ง์ํด์ค๋ค.
์ฃผ๋ฌธ์๋ฃ
๊ฒฐ์ ์๋ฃ
๊ฒฐ์ ์ทจ์
์ ์ธ๊ฐ์ง ์์๊ฐ ์ง์ผ์ ธ์ผํ๋ ๋ก์ง์์ ์ฃผ๋ฌธ์๋ฃ->๊ฒฐ์ ์๋ฃ->๊ฒฐ์ ์ทจ์๊ฐ ๊ฐ๋ณ partition์ ๋ถ๋ฐฐ๋๋ฉด ์์ ๋ณด์ฅ์ ํ ์ ์๋ค. ์๋ฅผ ๋ค๋ฉด partition1์ 1๋ฒ์ธ ์ฃผ๋ฌธ์๋ฃ๊ฐ ํ ๋น๋๊ณ , partition2์ 2๋ฒ์ธ ๊ฒฐ์ ์๋ฃ๊ฐ, partition3์ 3๋ฒ์ธ ๊ฒฐ์ ์ทจ์๊ฐ ํ ๋น๋์๋ค๋ฉด ์ด๋ค ๋ฉ์์ง๊ฐ ๋จผ์ ์ฒ๋ฆฌ๋ ์ง ์ ์ ์๋ค. ๊ทธ๋ ๊ธฐ์, ํ partition์ ์ฃผ๋ฌธ์๋ฃ, ๊ฒฐ์ ์๋ฃ, ๊ฒฐ์ ์ทจ์๊ฐ ์์๋๋ก ํ ๋น๋์ด์ผ ํ๋ค.
๋ฐ๋ผ์ kafka์์๋ ๋ฉ์์ง๋ฅผ sendํ ๋์ key๊ฐ์ ๊ฐ์ด ๋ณด๋ด, kafka์์ ์ด key๋ฅผ hashํจ์๋ฅผ ์ฌ์ฉํด ํน์ partitionํ ๊ฐ์๋ง ๋ถ๋ฐฐํ ์ ์๋๋ก ํ๋ค(ํํฐ์ ๋). ์ฆ ์์๊ฐ ์ง์ผ์ ธ์ผ ํ๋ ๋ก์ง์ ๋ํด์๋ ๊ฐ์ key๊ฐ์ ๋ถ์ฌํด ํ partition๋ด์์๋ง ์ฒ๋ฆฌํ ์ ์๊ฒ ํ๋ ๊ฒ์ด๋ค. ์์ ๊ฐ์ ๊ฒฝ์ฐ ์ฃผ๋ฌธ์๋ฃ, ๊ฒฐ์ ์๋ฃ, ๊ฒฐ์ ์ทจ์์ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ํ ๊ฐ์ partition์์ ์ฒ๋ฆฌํ ์ ์๋๋ก (๊ฐ์ ์ฃผ๋ฌธ์ ๋ํด์) key๊ฐ์ ๋ชจ๋ ๋๊ฐ์ด ๋ณด๋ด๋ฉด ๋๋ค.
์ค๋ณต ๋ฉ์์ง ์์
kafka์์ consume ์ดํ offset commit ๋ฑ์ ๋ชปํ๋ ๋ฑ์ ์ด์๊ฐ ๋ฐ์ํ๋ฉด, ์ด๋ฏธ consumer์๊ฒ ์ ๋ฌํ ๋ฉ์์ง๋ฅผ ์ค๋ณต์ผ๋ก ์ ๋ฌํ ์๋ ์๊ฒ ๋๋ค
kafka์์๋ consumer๊ฐ ์ด๋๊น์ง ๋ฉ์์ง๋ฅผ ์์ ํ๋์ง๋ฅผ offset์ผ๋ก ๊ด๋ฆฌํ๋ค. ๋ฐ๋ผ์ ๋ค์ ์์์ ๋ฉ์์ง๋ ๋ช ๋ฒ์งธ ์ธ๋ฑ์ค๋ฅผ ๋ณด๋ด์ผ ํ๋์ง๋ฅผ kafka๊ฐ ์ ์ ์๋ ๊ฒ์ด๋ค.
๊ทธ๋ฐ๋ฐ kafka์์ comsumer์๊ฒ ๋ฉ์์ง ์ ๋ฌ ํ consumer๊ฐ ๋ฉ์์ง๋ฅผ ์์ ํ ์ ์ฒ๋ฆฌํ์ง๋ง, kafka์์ ํต์ ์ฅ์ ๊ฐ ๋ฐ์ํด์, kafka์๊ฒ ์๋ ค์ค์ผํ๋ offset commit์ ํ์ง ๋ชปํ ๊ฒฝ์ฐ ์ค๋ณต ๋ฉ์์ง ์์ ์ด ๋ฐ์ํ ๊ฐ๋ฅ์ฑ์ด ์๋ค. ์ฆ, consumer์ ์ฅ์์๋ ์ฒ๋ฆฌํ๋ ๋ฉ์์ง๋ฅผ offset commitํ์ง ๋ชปํจ์ผ๋ก์จ kafka์์ offset์ฐจ์ด๋ก ์ธํด ์์ ํ๋ ๋ฉ์์ง๋ฅผ ์ค๋ณต์ผ๋ก ๋ ์์ ํ๋ ๊ฒ์ด๋ค.
๋ ๊ฐ์ง ํด๊ฒฐ๋ฐฉ๋ฒ์ด ์๋ค.
throw exception์ผ๋ก ํธ๋์ญ์ ๋กค๋ฐฑ
์ฑ๊ณต(http status 200) ์๋ต
๋ก์ง์ ๋ฉฑ๋ฑ์ฑ(์ฐ์ฐ์ ์ฌ๋ฌ ๋ฒ ์ ์ฉํ๋๋ผ๋ ๊ฒฐ๊ณผ๊ฐ ๋ฌ๋ผ์ง์ง ์๋ ์ฑ์ง์ ์๋ฏธ) ๊ตฌํ
1๋ฒ ๋ฐฉ๋ฒ์ ๋ณ๋์ ํ ์ด๋ธ์ ๋๊ฑฐ๋ ์ ์ปฌ๋ผ์ ์ถ๊ฐํ๋ฉด ํด๊ฒฐ๊ฐ๋ฅํ๋ค. ํ๋์ ํธ๋์ญ์ ์ unique index ๊ฐ ๊ฑธ๋ ค ์๋ msg_id ๋ฅผ insert ํ๋ ๋ก์ง๊ณผ (ex: PROCESSED_MESSAGE ํ ์ด๋ธ) ์ค์ ๋น์ฆ๋์ค๋ฅผ ์ฒ๋ฆฌํ๋ CUD ๋ก์ง์ ๋ฌถ์ผ๋ฉด ๋๋ค. unique์ด๊ธฐ ๋๋ฌธ์ ์ค๋ณต insert๊ฐ ๋์ง ์์ exception์ด ๋ฐ์ํด ๋กค๋ฐฑ์ด ๊ฐ๋ฅํ๊ฒ ๋๋ค.
2๋ฒ ๋ฐฉ๋ฒ์ ์ค๋ณต ๋ฉ์์ง๋ ์ด์ฐ ๋์๋ ์ด์ ์ ํ ๋ฒ ์ฒ๋ฆฌ๋์๋ ๋ก์ง์ด๋, exception์ด ๋ถํ์ํ๋ค๊ณ ์๊ฐ๋ ๋ ์ฌ์ฉํ ์ ์๊ฒ ๋ค.
๋๋ ์ค๋ณต ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ๋ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ consumer ๋ก์ง์ ๋ฉฑ๋ฑํ๊ฒ ๊ตฌํํ๋ฉด ์ค๋ณต ๋ฉ์์ง๊ฐ ์ ๋ฌ๋๋๋ผ๋ ์ด์๊ฐ ๋ฐ์ํ์ง ์๋๋ค.
Last updated