일전에 카프카 에러 포스팅 에서
Configuration 설정을 자바 클래스에서 해주었다. 그것도 클래스별로!!
에러 포스팅과 더불어
This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
이런 문구도 출력해줬었다.
근데 저 에러 포스팅을 보면서 좀 더 공통화할 수 없을까에서 찾아보다가 ErrorHandlingDeserializer 관련 검색을 해보다가 문서에서 찾게 되었던게 있는데,
기존의 설정을 이미지로 한번 가져와봤다.
그런데 이 방법을 Listener가 늘어나면 늘어날 수록 고수할 수가 없다는 생각이 들었다.
세부적인 사항 외에는 조금 다 yaml로 공통화를 할 수 있지 않을까?? 나는 SpringBoot를 사용하는데!
그래서 문서를 찾아본 결과
yaml 설정
spring:
kafka:
consumer:
group-id: test
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
use:
type:
headers: false
trusted:
packages: '*'
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
listener:
ack-mode: manual_immediate
retry:
topic:
attempts: 2
일단 헤더로서 값을 받지 않겠다 라는것을 선언해주고
문서처럼spring.kafka.consumer.properties.spring.json.type.mapping=메시지를 수신할 dto 풀경로
로 전역에서 여러개를 설정해주어도 되지만, 이 부분은 리스너마다 다를것이라고 생각해서 @KafkaListener
마다 설정해주기로 했다.
@KafkaListener(properties={
"max.poll.records=15", // 한 작업에 15개의 레코드를 컨슘한다.
"spring.kafka.consumer.properties.spring.json.type.mapping=com.github.lsj8367.MessageReq"
})
public void consumeMessage(final MessageReq request, Acknowledgement ack) {
// 메세지 처리
}
이렇게 완성하여 ConsumerConfiguration
을 제거하게 되었다.
나는 Spring을 쓰는 것이 아니라 이 설정을 한번 더 추상화하여 간편하게 쓰게 해주는 Boot를 쓰게해주는데
너무 Spring처럼 쓰는게 아닌가 싶었는데, 원하는대로 잘 바꾸어 준 것 같다.
나머지로 더 전역적으로 쓸 수 있는것은 공통으로 충분히 더 빼주고 특이한 케이스만이 별도의 Configuration 클래스로 빠져서 설정해주어야 하지 않을까 싶다.
아직 해결하지 못한 이슈가 있는데, 특정 Topic만 특이하게 한건을 consume하고 나서부터는 consume을 더이상 하지 않는 오류가 있다.
이 부분을 얼른 해결하고 싶다.
'Spring > Kafka' 카테고리의 다른 글
Kafka Offset Commit의 중요성 (0) | 2022.10.06 |
---|---|
Spring Kafka Deserializer Class Not Found Exception (2) | 2022.09.16 |