[kafka] 4.Kafka 모니터링

사용 환경

Kafka 모니터링 하기

Kafka는 모니터링을 위한 JMX 인터페이스를 제공하기 때문에 MBean 값을 모니터링 할 수 있다.

JMX

아직 미시도

Kafka Offset Monitor tool

왜인지 모르겠는데 깃허브에서 내려가서 따라하기가 힘들었다.

kafka 2.1 이상에서는 정상동작 하지 않았으나 고맙게도 2.1 이상부터 사용할 수 있게 빌드를 해서 jar파일을 공유해주신 분이 있어 찾아서 사용했다.

https://deep-dive-dev.tistory.com/51

java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --zk localhost:2181 --port 9000 --refresh 10.seconds --retain 2.days

–offsetStorage kafka : kafka 0.8 이전에서는 partition의 offset을 zookeeper가 관리하였으나, 이 후에는 broker에 기록됨. 아마 툴 자체가 구버전 용이다 보니 zookeeper가 기본 값인 듯 하다.

이후 토픽과 컨슈머 그룹을 만들어 준다.

kafka-monitoring

문제1. Topic의 Consumer list를 못 가져옴

org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
        at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
        at kafka.utils.ZkUtils$.getChildren(ZkUtils.scala:462)
        at com.quantifind.utils.ZkUtilsWrapper.getChildren(ZkUtilsWrapper.scala:62)
        at com.quantifind.kafka.core.ZKOffsetGetter.getGroups(ZKOffsetGetter.scala:62)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anonfun$getGroups$1.apply(OffsetGetterWeb.scala:100)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anonfun$getGroups$1.apply(OffsetGetterWeb.scala:100)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$.withOG(OffsetGetterWeb.scala:89)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$.getGroups(OffsetGetterWeb.scala:99)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$.reportOffsets(OffsetGetterWeb.scala:69)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anonfun$schedule$1.apply$mcV$sp(OffsetGetterWeb.scala:80)
        at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anon$2.run(OffsetGetterWeb.scala:49)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1472)
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1500)
        at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:99)
        at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:416)
        at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:413)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 18 more

원인

Kafka consumer가 zookeeper 접속에 실패하면 발생하는 에러

Kafka는 consumer group에 변화가 생기면 partition을 다시 분배하기 위해 rebalance가 일어납니다. 그런데 heartbeat를 zookeeper로 보낼때 실패하면 해당하는 broker가 죽은 것으로 인지하여 발생하기도 합니다.

이때 heartbeat의 timeout 시간을 조정하는 consumer의 설정값이 바로 zookeeper.session.timeout.ms 인데 초기값이 6000 (6초)로 부하가 큰 경우 timeout이 여러번 발생하며 consumer가 본래의 작업을 하지 못하고 rebalance가 계속해서 일어날 수 있습니다.

아래와 같이 consumer의 설정값을 수정해주면 문제가 해결될 수 있습니다:

테스트 과정

  1. kafka, zookeeper 실행

    $ docker-compose up

  2. kafka offset monitor 실행

    $ java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka –zk localhost:2181 –port 9000 –refresh 10.seconds –retain 2.days

  3. 컨테이너 접속 후 kafka topic 생성

    $ docker exec -it kafka-container /bin/bash

    $ kafka-topics.sh –create -bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic “test”

  4. consumer group 생성

    $ kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning –group my-group

    만들고 exit

  5. publish를 하고 consume하지 않으면 모니터링 결과에 빨간색 그래프가 치솟는다.

    $ kafka-console-producer.sh –broker-list localhost:9092 –topic “test”

  6. group에 맞춰 consumer를 작동시키면 빨간색 lag이 줄어든다

    $ kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning –group my-group

개선

알아보니 ELK 스택을 이용해 Kafka의 Consumer Lag을 시각화 할 수 있다고 한다.

다음은 이걸 시도해 봐야겠다.