728x90

개요

회사의 기술스택에서 카프카를 사용하고 있다.
내가 이번에 정리하는 글은 막연하게 ack을 날리는 부분에 있어서 간과했던 로직 때문에 벌어진 이슈를 정리한다.

어떻게 로직이 생겼었는가?

우선 카프카를 사용하기 위해선 스프링 부트에서 Spring for Apache Kafka 를 사용해야 한다.
나는 로직에서 try-catch-finally 를 붙여 사용했는데 동료 개발자분께서 의도하신 것인지는 모르겠으나 catch절을 빼고 로직을 구성했다.

@KafkaListener
public void consumeExample(final ScrapeUpdateDto dto, final Acknowledgement ack) {
    try {
        // do something
    } finally {
        ack.acknowledge();
    }
}

이러한 방식으로 구현을 했었고, try 내부에서는 throw를 하는 부분이 군데군데 적용이 되어있었다.

에러는?

샘플 코드에서 do something에 해당하는 로직에서 문제가 발생하게 되는데, 그것이 바로 재시도 로직은 어디에도 존재하지 않았다.
하물며 Spring측에서 제공하는 @Retryable 이나 Spring Cloud Openfeign에서 설정하는 Retry 를 어디에도 구성하지 않았는데 실제 호출은 원본 호출까지 합쳐 10번을 수행했다.

디버깅 시 당연하다고 생각한 부분

일단 디버깅을 진행했을 때 아 그럼 예외 핸들러가 뭐 처리를 했겠지 라는 생각으로 대충 ErrorHandler를 검색했다.
검색하고 보니까 뭔가 힌트는 얻었다.

그것이 바로 DefaultErrorHandler
근데 이 클래스가 10회가 발생하고 백오프 9번 재시도 한다고 하는데 이 부분을 생성해주는 곳은 어디에도 없었으며, 변수로 명시한 setter 메소드에도 디버깅 포인트를 집어놓아도 null만을 주입하고 있었다.

이것이 바로 그 에러 핸들러의 생성자 부분이다.

아직 잘 모르겠으니 차례대로 디버깅을 수행해보도록 하자!

디버깅

우선 KafkaAnnotationDrivenConfiguration 을 통해 ConcurrentKafkaListenerContainerFactoryConfigurer 를 생성한다.

모든 값이 default 설정이라 그런가 전부 errorHandler 부분이 null을 가지고 있다.
여기서 DefaultErrorHandler는 절대 사용되지 않는다라고 확정지었다.

이후에 ConcurrentKafkaListenerContainerFactory를 커스텀한 설정을 넣지 않는다면 자동생성 빈으로 구성해준다.


죄다 이런 default 클래스들로 구성을 해준다.

이렇게 기본 팩토리 구성을 끝마친다.

팩토리 생성 이후

팩토리가 만들어졌다면 당연히 Listener들을 생성시켜주어야 하는데, 이 부분을 createListenerContainer 메소드로 생성해주고 있다.

아래 로직은 ConcurrentKafkaListenerContainerFactory 가 상속한 AbstractKafkaListenerContainerFactory 에 구성된 로직을 오버라이딩한 메소드이다.

createListenerContainerInstance 로 리스너에 대한 부분을 생성해주게 된다.

여기서 추가적으로 알게된 것은 카프카를 설정하기 위해 yaml파일에 구성하는 concurrency 값을 토대로 리스너 컨테이너를 생성한다.

어 잠깐만! 근데 리스너 컨테이너 생성하면 리스너가 여기서 설정되는거 아냐?

위의 리스너 컨테이너도 마찬가지로 추상 컨테이너를 상속받고 있는데, 이 부분을 살펴봤다.

왜?

당연히 리스너면 컨슈머에 대한 설정 부분이 어느정도 구현이 되어있을 것이라고 추측하고 들어가보았다.

변수에는 롤백 프로세서 변수가 default 설정이 되어있는 것을 볼 수 있다.

아래 코드는 AbstractMessageListenerContainer의 변수로 선언된 일부를 가져왔다.

private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();

DefaultAfterRollbackProcessor를 따라가보면 정답이 나올 것 같아서 바로 들어갔다~~

아래와 같은 설정 구성이 들어가있었고, DefaultErrorHandler와 같이 최초 1회 수행한 후 로직내부의 별도의 Exception을 처리해주는 부분이 없다면 9회를 재시도하도록 구성이 되어있었다.

디버깅 끝낸 후

이 부분을 끝내고 난 뒤 공식문서를 봤는데,

공식문서에서 deafult error handler 설정 값이 어떻게 들어가있는지 알려주고 있다.

공식문서 바로가기

 

예외를 처리해줄 핸들러를 구현할 때 참고할 점

추가적으로 코드에서 설명해주고 있는 부분은

AbstractKafkaListenerContainerFactory내부에 있는데,

deprecated 처리된 메소드들

  1. 2.2버전 이상
    1. setErrorHandler
    2. setBatchHandler
  2. 2.8버전 이상
    1. setRetryTemplate
    2. setRecoveryCallback

전부 setCommonErrorHandlersetAfterRollbackProcessor 메소드를 사용하도록 추천하고 있다.

결론

결론은 무조건적으로 ack을 날리더라도 dead letter를 바로바로 처리해줄 수 있는 또 하나의 프로듀서, 컨슈머 한쌍을 만들어 두던지, 아니면 db에 따로 오류난 메시지들을 적재하고 배치로 수행해주던지 구성이 필요할 것 같고,

에러를 명확하게 던진 부분을 컨슘하는 곳에서 명확하게 처리해줄 상황이 아니라면,

Custom한 ErrorHandler를 구성하는게 바람직할 것 같다.

728x90

'디버깅' 카테고리의 다른 글

Stream 오류 제거  (2) 2023.04.21
@Transactional 제대로 알고쓰기  (4) 2023.02.27
FeignClient Logging level 디버깅  (0) 2022.12.17
@Async 사용시 에러 해결  (0) 2022.11.04

+ Recent posts