728x90

개요

당연하게 사용하던 Java의 람다 기능에 대해서 의문을 갖지 않고 막 써댔다.

그렇지만, 면접에서의 질문을 받았을 때 당황했다. 뜬금포로 final을 쓰고 안쓰고의 차이를 물어보셨는데 순간 답변을 하지 못했다.

그래서 정리해본다.

 

람다 캡쳐링에 대해서 알아보기 이전에 Effectively final에 대해 먼저 알아보자.

Effectively Final 이란?

해당 단어를 deepL을 통해 해석해보면 사실상 최종 이라고 해석해준다.

final을 선언한 상수와 같이 변경되지 않았다면 그와 같은 수준으로 컴파일러가 해석해준다.

 

effectively final이 되려면 아래의 3가지 조건을 만족해야 한다.

아래 3가지 조건은 공식문서를 통해서 나와있는 정보들이다.

  1. 명시적인 final을 선언하지 않았다.
  2. 재 할당을 하지 않아야 한다.
  3. 접두 또는 후미에 증감연산자를 추가해서 데이터를 바꾸지 않아야한다.

객체라면 참조 주소값만 바뀌지 않는다면 그대로 계속 effectively final 로 유지할 수 있다.

 

정상적인 Effectively Final

정상적인 lambda식

Effectively Final이 제대로 되지않은 경우

비정상적인 lambda식

자바가 친절하게 설명을 해준다.

 

자, 이제 변수값을 내부에서 변경하지 않으면 잠정 final로 보고 람다식에 데이터를 명확하게 넣어줄 수 있다.

Lambda Capturing에 대해 알아볼 시간이다!

Lambda Capturing이란?

외부에서 정의한 변수를 사용할 때 람다식(익명 클래스의 function)에서 복사본을 생성하게 된다.

외부라는 의미는 지역변수나 전역변수(인스턴스)와 클래스 변수들을 전부 아우르는 표현이다.

그럼 Capturing을 적용하지 않는 경우도 있을거 아닌가?

당연히 사용하지 않을 수 있다. 그래서 변수를 넣지 않고 동작할 수 있는데 이때는 외부의 변수를 주입받아 사용하는게 아니라서 캡쳐링이

적용되지 않아서 non-capturing이라고 한다.

Lambda Capturing은 왜 복사본을 만드는가?

일단 지역변수는 메모리 구조상 스택 영역에 할당된다.

스택은 스레드가 실행됐을 때 고유한 영역으로 가지고 있게된다. 그래서 스레드끼리는 공유할 수 없고, 스레드가 종료되면 해당 스택 영역도 사라진다.

여기서 이제 문제가 발생하는 것이다.

 

아래 코드에서 ss 라는 문자열을 복사해서 갖고있지 않는다면 new Thread 부분에선 지역변수로 묶여있는 test()가 스레드보다 더 빨리 수행되고 끝날 가능성이 존재하기 때문에 null을 줄 수도 있을 것이다. 이렇기 때문에 복사본을 만들어 유지하는 것이다.

public void test() {
    String ss = "test";

    new Thread(() -> {
        try {
            Thread.sleep(1000L);
            System.out.println("thread1 ss : " + ss);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();

    System.out.println("ss : " + ss);
}

 

여기서 근데 effectively final이어야 하는 이유는 위에서 봤듯이 멀티 스레드 환경에서 람다식이 동작할 수 있기 때문에

지역변수는 또 스레드마다 공유하지 않기도 한다. 때문에 어떤 복사본이 최신인지를 자바 입장에서는 확인할 방법이 없기 때문에 final변수로만 지역변수를 사용해야 하는것이다.

조용하던 인스턴스 변수나 클래스 변수는?

인스턴스 변수는 메모리 구조상 힙에 할당된다.

우리는 알고있다. 힙은? -> 모든 스레드가 공유할 수 있는 메모리이다.

클래스변수는 static 변수가 이 부분에 포함되는데, 메소드 영역에 할당이 된다.

그래서 값이 바뀌던 말던 그 데이터는 항상 같게끔 유지할 수 있기 때문에 할당할 수 있는 것이다.

정리

람다식 내부에서 지역변수를 사용하는 경우 final이나 effectively final 변수를 사용해야 한다.

-> 이유는 메모리 구조의 stack 영역에 저장되기 때문

final이 아니라면 복사되는 값이 어떤 스레드에서 바꾼것이 가장 최신의 복사본인지 알 수 있는 방도가 전혀 없다.

 

728x90

'Java' 카테고리의 다른 글

참조 유형  (0) 2022.09.12
Checked Exception, Unchecked Exception  (0) 2022.09.07
변성  (0) 2022.08.11
일급 컬렉션  (0) 2022.08.09
728x90

의문점

스크래핑 로직 실행 시간에 대해 의문을 가졌다.

평균 시간이 그럭저럭 다들 비슷한 수준에 머무는데, 이상하게 한 부분만 너무 느렸다. 증권사에 대한 스크래핑 내역이었는데,

특정 증권사만 중복 로그인이 감지되었을 경우 30초를 대기했다가 다시 수행할 수 있게 하는 로직이 들어있었고,

테스트 코드는 그에맞는 정말 30초를 기다리는지에 대한 여부를 테스트 하고 있었다.

🤔 그냥 30초를 기다리는것보단 특정한 법인이 아니라면 30초를 기다리지 않는다 를 테스트하면 되지 않을까?

이렇게 생각했던 이유는 30초를 테스트에서 기다리는 비용이 비싸다고 생각했고, 전체 테스트를 돌릴 때 이 부분때문에 30초를 더 기다려야 한다는 것이다. 그리고 milliseconds 차이값을 비교했기에 간헐적으로 실패하는 경우도 있었다.

 

그래서 더 고치고 싶었는데 일단 테스트를 돌려봤다.

 

디버깅

바로 디버깅 시작했는데 내가 stream 로직을 잘 구성하지 못하는 것인가? 라는 생각도 들었다.

stream 내부에 stream을 또 구성하고 있는 상황이었다.

 

이말은, stream 내부의 stream에선 어떤 값을 반환하던 간에 바깥에서 Stream으로 래핑을 하고 있으니 뭘해도 값이 존재한다는 가정이 먹힌다는 뜻이다.

그래서 위에서 나는 에러점이 보였던 것이다.

해당 부분을 사진을 좀 가져오려고 했는데 워낙에 적나라하게 코드를 다 보여주는 것 같아서 예시 코드를 가져ㅇ와봤다.

 

가장 짤막하게 예시를 들어볼 수 있는게 햄버거가 아닐까 싶었는데

안에 토핑을 생각하다가 간단하게 햄버거 이름, 빵종류, 소스가 무엇인지만 적어봤다.

물론 햄버거 재료중에 패티가 빠지면 섭한데 대충 보자. ㅋㅋㅋ

 

Java 14버전부터 해당 record 클래스가 등장했다.

나는 집에서는 java 17버전을 쓰고있기에 record로 적었고

이미지와 아래의 코드는 같은것이라고 생각하면 되겠다.

그러면서 getter가 기본으로 구현되어있고 getXXX() 방식이 아닌 바로 변수 이름과 같은 메소드를 호출할 수있다.

public final class Hamburger {
    private final String name;
    private final String bread;
    private final List<Sauce> sauces;
}

 

아무튼 이런 햄버거와 안에 들어간 소스를 봐야하는데 소스도 정말 대충 value라고 이름을 지었다.

무튼 이렇게 두개 클래스가 있는데

 

이런 코드를 예시로 만들었다. 로직만 다를뿐이지 이슈가 있던 로직과 같은 구현법이다.

햄버거들에서 소스만 발라내서 매운 소스가 들어간지 여부를 확인하여 있으면 true 없으면 false를 반환하려고 했었던 로직인 것 같다.

문제가 있던 로직

잘보면 뭐 햄버거들중에 소스를 발라내고 그 소스들을 뒤져가면서 null이 아니며 spicy 소스인 데이터를 찾으면 바로 true 반환하려고 했던 것 같다.

 

위에서 설명했던 30초 로직은 해당 메소드에서 true가 반환되면 타게 되어있었다.

public으로 구현했지만 회사의 로직은 private으로 구현되어 있는 메소드였다.

 

각설하고, 이부분에 대해 테스트를 해보니 "spicy" 라는 값을 넣지 않아도 무조건 테스트가 항상 true였다.

그래서 걸러주어야 할 로직들도 모두 참이 되어버리는 것이다.

 

소스에 살사와 머스타드를 넣었음에도 불구하고 spicy 소스가 없는데 참이 나온다.

 

자세히 보면

stream내부에 stream이 있지 않는가?

내부에서 뭘 수행하던간에 아직 stream 연산을 끝맺어주지 않았기 때문에 Stream<> 객체를 반환해줄 것이고 그렇기 때문에 항상 그 내부 연산이 존재하던 않던 상관없이 Stream 객체 자체가 null이 아니고 존재하기 때문에 findFirst() 를 수행해도 항상 참을 반환하는 것이었다.

 

개선하기

그래서 어떻게 이 부분을 개선했는가?

public boolean isExistSpicySauce(List<Hamburger> hamburgers) {

        final Set<Sauce> sauces = hamburgers.stream()
            .map(hamburger -> hamburger.sauces())
            .flatMap(List::stream)
            .collect(Collectors.toSet());

        return sauces
            .stream()
            .filter(sauce -> sauce.value() != null)
            .anyMatch(sauce -> "spicy".equals(sauce.value()));
}

이런식으로 개선했다.

결과는 당연히 spicy가 포함되지 않았기 때문에 false를 반환했고 테스트도 성공한것을 볼 수 있다.

stream으로 엮어주고 Set으로 변환한 이유는 어떤 햄버거든 소스들만 추려 spicy만 있으면 됐기 때문에 이런식으로 구현을 해주었다.

회사 코드에선 한 법인의 여러개의 공동인증서를 가지고 판별하는 문제였기 때문에 이것처럼 구현해보았다.

 

stream내부에서 stream을 또 사용할 때에 주의해서 써야겠다고 이 부분을 수정하면서 느꼈다.

어쩌면 너무 당연한 얘기일 수 있었지만, 이 부분 때문에 모든 스크래핑에서 30초 정도의 중복 로그인 대기 시간이 해소되었고 30초 * n개의 증권사 스크래핑 시간을 특정 기관에서만 중복로그인 30초만 대기할 수 있도록 수정되었다.

무의미한 스크래핑 대기 시간을 해소했다는 얘기이다!!

아무튼 한동안 너무 신규 기능개발건에 대해 초점이 맞춰져서 힘들었었는데, 계속해서 기존 코드 부수고 고치며 성능개선하며 이전 코드들을 돌아볼 수 있는 시간을 생각을 전환하니 어느새 갖고있게 되었다.

해당 생각의 전환은 https://jojoldu.tistory.com/710 여기서 자극을 많이 받게 되었다.

728x90
728x90

회사의 서비스들이 여러개로 쪼개져있다.

그래서 우리는 주로 FeignClient를 사용하는데, 애를 먹었던 로깅레벨에 대해 포스팅한다.

업무중에 삽질을 진행했었으며, 해당 내용으로 자바스럽게 고쳤던 경험을 좀 풀어본다..

 

스프링 프레임워크를 사용하고 있기에 여기서 같이 제공해주는 Spring Cloud의 OpenFeign을 사용하였다.

 

아래는 사용하는 예시이다.

출처 -&nbsp;https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/

 

해당 예시처럼 나는 FeignClient를 구현했었다.

물론 @EnableFeignClients 는 별도의 Configuration 클래스 파일에 설정을 해주었었다!

 

여기까지는 일단 기본적인 설정이지만, 아래의 레벨 설명이 진짜다.

 

Feing Logging Level은 총 4단계로 이루어져있다.

  • NONE:  로깅 없음(Default)
  • BASIC:  요청 방법 및 URL, 응답 상태 코드 및 실행 시간 기록
  • HEADERS:  요청 및 응답 헤더와 함께 기본 정보를 기록
  • FULL : 요청과 응답 모두에 대한 헤더, 본문 및 메타데이터를 기록

기본적으로 BASIC, HEADERS에는 정상작동을 했던 우리의 소스였다.

 

일단 소스가 어떤 구성이냐면...

 

회사의 정보를 많이 노출할 수는 없기에 간략하게 설명한다.

 

일명 스크래핑이라고 하는 기술, 흔히 크롤링이라고도 할 수 있을것이다.

해당 작업을 외부 API로 연동하여 응답값을 받아 파싱해주는 작업이 있다.

근데 우리의 규칙상 해당 스크래핑의 원문 데이터라는 아예 크롤링의 전체에 해당하는 xml양식의 데이터도 같이 받아오는 방식을 정책으로 정했기 때문에 우리는 Google Cloud 를 사용하기에 Google Cloud Storage에 원문 데이터를 저장하고, 나머지 데이터를 데이터베이스에 저장하는 구조로 구성이 되어있다.

 

여기서 문제가 발생한다.

 

일단 원문 데이터가 포함이되기에 응답값의 길이가 상당히 길다.

길지않고 정상적인 응답을 받는 스크래핑 로직의 경우에는 이런 디버깅이 해당되지 않는다.

 

근데 글자가 길었고, 내가 핸드폰에서 슬랙으로 알림을 받았던 에러메시지는 다음과 같았다.

에러 로그.... 정보유출되는 것이 너무 많아 다 지워버렸다 ㅋㅋㅋ...

아무튼 이러한 예외가 발생했다.

에러 메시지를 보면 해당 응답값의 타입이 application/json이 아니라는데

디버깅을 찍어보면 정상 json응답값이 파싱이 되고 있었다.

 

아래부분이 좀 더 의심스러웠었는데, buffer Length : 8192 

그러니까... 버퍼길이가 8192 제한인데 길이가 더 커서 담기지 않아서 에러가 발생한다는 것이다.

 

이 부분이 왜 나왔는지를 보니까

 

해당 스크래핑 작업을 하는 로직에서 우리는 로그를 찍는다.

 

스크래핑 작업 전후로 시작했고 완료되었다는 외부의 로직 실행결과를 로그로 담고 있다.

 

그런데 이 담는 로직에서 예외를 이렇게 뱉는다는 것이다.

 

feign 의 기본 클라이언트는 어떤것을 쓰냐면 바로 ApacheHttpClient를 사용한다.

 

아래는 구현체의 내용 일부분이며, 빨간 네모의 asInputStream으로 body를 출력하고 있다.

 

httpResponse에서 받는 해당 entity의 content를 찍어내는 것인데, 이 부분에서 에러가 발생한다.

기본 maxContentLength 가 8192이기 때문에 정상적으로 스트림 반환을 하지 않고 저렇게 예외를 띄우게 되는 것이다!

이부분이 TCP 연결에서의 최대 길이이기 때문에 HttpClient를 사용하면 이런 예외가 발생된다.

 

아래는 ByteArrayBody이다.

해당 부분에서는 ByteArrayInputStream을 사용하기 때문에 TCP length 제한보다 더 쓸수 있는 것 같다.

이것이 바로 ByteArrayInputStream....?

아무든 이 두 asInputStream()을 보는 이유는 아래에서 설명하겠다.

아무튼 이 ByteArrayInputStream은 tcp max content length와 관계없이 스트림을 출력하기에 길이제한이 없다.

 

이 두개의 차이를 가르는 것은 Log 였다!!!

 

그래서 소스를 파고들었다.

 

해당 로그 부분이 BASIC레벨 보다 수준이 낮은 경우 수행되는 로직인데, 

그래서 FULL로직에서만 정상적으로 데이터 파싱이 된다.

 

왜? - response.toBuilder().body(bodyData).build() 를 통해 새로운 ByteArrayInputStream으로 생성되기 때문이다.

 

그래서 이 부분을 재정의를 해주어 BASIC 레벨에서도 길이가 아무리 길더라도 데이터를 파싱할 수 있게 설정해주었다.

 

로직이 정상적으로 수행되며 아래 로그부분에서 객체를 다시 만들어주게 되는걸 보고나서 넘 행복했다는것...😇😇😇

 

해결방안

해당 Feign의 Logger를 상속받아서 log찍는 부분을 override 시켜주었다.

그래서 기존 로직을 또 건드리고 싶지는 않았다. <- 로그는 이대로 찍어주는게 마음 편했으니까?

그래서 분기처리 하는 부분을 제거하고 정말로 필요없는 부분만을 제거해주고 나머지 로직은 유지했다.

 

아무튼 데이터가 길었기 때문에 로그레벨에 따라 값이 파싱이 되고 안되고 났던게 좀 신기했다.

길이가 짧은 응답을 사용했다면 오히려 이런것도 모르고 그냥 설정따라 파싱해주고 안해주고로 넘어갔을 것 같다!!!!

728x90

'디버깅' 카테고리의 다른 글

Stream 오류 제거  (2) 2023.04.21
@Transactional 제대로 알고쓰기  (4) 2023.02.27
@Async 사용시 에러 해결  (0) 2022.11.04
Jenkins 에러  (0) 2022.08.11
728x90

오랜만에 포스팅하는데 회사에서 그동안 앱 2.0 버전을 출시한다고 이래저래 바빴던 나날을 보냈다.

결과적으로는 만족스러운 출시..? 였던것 같다 ㅋㅋㅋ

버그도 많았고, QA 엔지니어께서 고생을 많이 하셨을 수도 있고 내가 구현한 메시지 플랫폼도 테스트하기가 정말 까다로웠다.

각설하고..

해당 에러사항을 구현하는건 Kafka를 이용하지 않아도 되기 때문에 RestAPI로 구현했다. (+ 테스트코드로만)

모든 코드는 깃허브에 있다.

이번엔 무슨 버그였냐?

이런 에러가 쏟아져나왔다. 기존 레거시 푸시는 NHN Toast를 이용한 푸시서비스로 구성되어 있었다.

변경한다고 해도 과도기가 존재하기 때문에 바로 지울수는 없고 이전 앱을 사용하는 사용자들에게는 해당 푸시로 알림은 계속 받아야되기 때문이다.

왜 났던 에러였는지 더듬어봤더니

Async 설정

기본적으로 @Async 를 사용하기 위해서는 Configuration 클래스를 하나 만들어주고 비동기 실행기에 대한 설정을 해주어야 한다.

스크린샷 2022-11-04 오후 11 40 12

이런식으로 에러를 일부러 내기 위해서 큐 사이즈를 5개 기본 스레드 1, 최대 스레드 2, 그리고 비동기로 처리할 큐 사이즈를 5로 구성했다.

문제상황 1

nhn 푸시를 보내기 위해선 Kafka에 Producer를 통해 메세지를 넣어주면, 내가 구현한 메시지 플랫폼에서 해당 메시지를 Consume하여 동작하는 방식으로 구성되어있다.

그런데 나는 비동기로 구현하겠다고만 생각하고 사이즈를 제한해두지는 않았었다.

그래서 Consume은 계속해서 KafkaListener를 통해 무제한으로 하고 있는 와중에 이 컨슘 속도가 너무 빠르니까

위에서 설정한 비동기 처리 큐 사이즈를 200으로 잡아놓아도 예를 들어 1000건이 한방에 producer를 통해 적재된 후 바로 consume을 해버리니까

100개 큐를 훌쩍 넘어버려서도 막는게 아니라 지속적 컨슘이 일어나기에 큐 사이즈를 넘어서는 순간에도 이 메소드 실행을 밀어 넣는 오류였다.

그래서 맨위의 이미지처럼 Exception이 발생하게 되는 것이다.

에러 직접 구현

정확한 코드는 깃허브에서 보면 되지만 여기서는 간략하게 대충 구현하도록 하겠다.

JDK 17버전이기 때문에 record class를 사용함. (record를 사용하면 클래스는 final로 선언되며 equals & hashCode를 재정의하여 갖고있음)

/hello 를 호출하면 printAsync라는 비동기 메소드를 100번호출하게끔 만드는 로직이 있다.

HelloController.java

@RestController
public record HelloController(HelloService helloService) {

    @GetMapping("/hello")
    public ResponseEntity<?> hello() {

        int i = 0;
        while (true) {
            helloService.printAsync();
            i++;

            if (i == 100) {
                break;
            }
        }

        return ResponseEntity.ok(Map.of("data", "success"));
    }
}

HelloService.java

@Slf4j
@Service
public class HelloService {

    @Async("threadPoolTaskExecutor")
    public void printAsync() {
        try {
            Thread.sleep(5000);
            log.info(Thread.currentThread().getName());
            log.info("hello service print!!!");
        } catch (InterruptedException e) {
            log.error("error : {}", e.getMessage());
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

}

printAsync()는 비동기로 호출하며 5초를 기다리고 log를 찍게 되는데, 우리가 처음 전역적으로 설정해준 스레드 설정은

요청이 들어오면 maxQueue 사이즈인 5까지만 담을 수 있게 되어있고 작업이 느려서 큐 데이터를 하나 소진하지 못한다면 에러가 발생하는데

그 에러는 이렇다.

이미지로 한방에 찍어지지 않아 글자로 첨부한다.

Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@265c1a7c[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$967/0x00000008010a7dc0@740a0d5e
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:391)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:292)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
    at com.github.lsj8367.application.HelloService$$EnhancerBySpringCGLIB$$7321912.printAsync(<generated>)
    at com.github.lsj8367.presentation.HelloController.hello(HelloController.java:17)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1070)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    ... 87 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@29013ef2[Not completed, task = org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$967/0x00000008010a7dc0@740a0d5e] rejected from java.util.concurrent.ThreadPoolExecutor@265c1a7c[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:388)
    ... 107 more

처음 봤던 이미지와 같은 예외가 발생하는걸 볼 수 있다.

그러니까 큐 공간이 가득차기 전에 계속해서 비동기를 호출하며 작업큐에 메소드를 태우려고 하다보니 이런 예외가 발생한 것이다.

해결은?

그래서 나는 일단 푸시 발송이 그렇게 오래걸리는 로직이 아니라서 일단은 동기 상태로 바꿔놓았지만,

지속적인 모니터링을 하여 트래픽이 많아지게 되면 그때는 batchListener를 적용하여 비동기로 여러개를 한방에 처리하는 방법을 고려중이다.

-> 이 부분은 카프카의 리밸런싱을 고려해야한다.

아무튼 내가 생각했던 여러 추측중에 하나가 걸리게 되니까 좀 재밌었던 것 같다.

그래도 오늘도 해결했다!

728x90

'디버깅' 카테고리의 다른 글

@Transactional 제대로 알고쓰기  (4) 2023.02.27
FeignClient Logging level 디버깅  (0) 2022.12.17
Jenkins 에러  (0) 2022.08.11
AWS SNS 토큰 에러  (0) 2022.08.10
728x90

저번 포스팅의 마지막 마무리가 바로 이 에러를 찾지 못했던 것이다.

Kafka 에러를 고치게 된 시점

처음에 특정 Consumer만 consume을 못한다고 했었는데, 이는 당연 잘못된 것이었다!!!

그냥 로그를 좀 더 세세하게 찍고 검토를 더 열심히 했어야 했다!

우선 대략적으로 코드를 보면 아래와 같다.

@KafkaListener(topics = "topic", properties = {
        "spring.kafka.consumer.properties.spring.json.type.mapping=com.github.lsj8367.MessageReq"
    })
public void consumeSomething(final MessageReq req, Acknowledgment acknowledgment) {
    pushService.sendPush(req);
    acknowledgment.acknowledge();
}

일단 회사의 카프카 푸시 로직에서는 자동커밋을 사용하지 않는다.

spring kafka 에서는 enable.auto.commit=true 이라는 설정을 통해 자동으로 일정 시간이 지나면 커밋하게 만들 수 있다.

그 시간은 auto.commit.interval.ms라는 옵션을 통해 설정을 진행할 수 있다.

중복이 생기는 현상

max.poll.records로 15개를 가져온다고 가정한다. (왜냐면 내가 15개씩 가져와서 스레드 15개로 한번에 할당하고 있으니까 15개로 했다)

이제 한번의 컨슘을 통해 이 record들을 15개를 가져왔다.

잘 작업하다가 한 7개 작업을 했다(실질적인 푸시까지 완료) 아직 커밋은 일어나지 않았다. 커밋시간은 조금 더 뒤에 일어나는 상황

여기서

배치 서버에서 파티션 1개로 하니까 너무 느리게 적재되는 것 같아요 병렬처리 하게 파티션을 n개로 바꿀 필요성이 있습니다

라고 하면서 파티션을 늘리는 안이 통과되어 바로 파티션을 늘린다. (참고 - 파티션은 늘릴 수만 있고 줄일 수는 없다)


리밸런싱 시작....

이러면 8개가 아직 작업이 남았지만 컨슘이 멈추며 리밸런싱을 시작한다.

어? 커밋된게 없네 해서 이전 15개를 다시 읽어 재처리를 진행하게 된다.

지금 남은 이 8개는 중복이 아니지만, 앞서 작업했던 7개의 메시지는 다시 작업을 수행하게 된다.

이런식으로 중복이 발생한다고 한다.

그럼 넌 뭐가 문제였는데?

나는 애초에 auto commit이 아닌 수동커밋으로 작업을 진행했다.

푸시지만 중복으로 발생되면 안되는 그런 푸시 정보이기에 수동으로 작업을 진행했다.

일단 Spring kafka의 기본값이 수동 커밋인 enable.auto.commit=false 이다.

그리고 ackMode를 설정해줄 수 있는데

ackMode는 뭐냐면 KafkaListener의 offset에 대한 commit을 어떻게 줄것인가를 설정하는 모드이다.

Spring의 기본 AckMode는 BATCH이며, 나는 manual_immidiate로 작업했다.

둘의 차이는 이미지로도 적어놨지만...

  • BATCH
    • poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋
    • 스프링 카프카 컨슈머의 AckMode 기본값
  • MANUAL_IMMEDIATE
    • Acknowledgment가 승인되면 즉시 commit

이 두가지의 차이가 있다.

에러 현상은 아무런 오류도, 로그도 찍어지지 않는데, 카프카 Producer 메세지를 발행하면 LAG로 적재되었다.

특정 토픽은 정상적으로 컨슘이 되는게 확인이 된게 너무 억울했었다.

그래서 100개를 연속으로 발행해도 LAG로 메시지가 쌓여서 처리를 못하는 현상이 발생해서 2주를 머리쥐어짰다. 😅😅😅

다른 토픽으로 받아봐도 결과가 계속 같아서 너무 짜증이 더 났었다.

카프카도 새로 설치해달라고 데브옵스분께 요청했고 그래서 이래저래 너무 힘들었다. (운이 좋았던건 dev환경의 k8s설정이 좀 이상하게 되어 다시 구성해야 되는 상황에 요청드렸던 운이 좋았다😱)

에러난 지점은...

@Slf4j
@Service
public class PushService {

    public void sendPush(final MessageReq req, Acknowledgment acknowledgment) {
        try {
            //푸시로직이 작성되어있다...
            log.info("push Success : {}", req);
        } catch (SendFailureException e) {
            log.error("push failure : {}", e.getMessage());
            //DLT 처리
        }
        acknowledgment.acknowledge();  // 에러지점
    }
}

이런식으로 예시로 푸시로직이 있다고 가정하고 이렇게 작성하면 당연히 푸시를 보내고 ack를 통해 offset을 전진시켜주었다.

이게 초반엔 되게 잘 작동했는데 새로운 예외케이스만 모르고 이대로 문제가 없는줄 알았다.

이 부분에서 문제가 되었던건 SendFailureException이 발생하지 않고 NPE나 다른 예외가 뜨게 되면 이게 메세지 처리가 안된다.

그래서 예외가 났으니 offset을 전진 안시켰기 때문에 다시 메시지에 적재 되는것이다.

일정 시간이 지나고 다시 또 똑같은 메시지를 컨슘하는 이 부분이 무한 반복하는 컨슘이 시작되는 것이다.

심지어 로그도 안찍는 현상이다.

지나가며 보던 블로그의 글들을 보며 commit은 항상 필수적으로 해줘야 한다는게 있었는데 난 당연히 commit을 무조건 하는것으로 생각한 나의 실수였다.

그러면 LAG에 고정으로 쌓여있냐? 그건 아니라는 소리다. 수치로는 n개가 적재된거로 보이겠지만 0으로 갔다가 다시 쌓였다가 할것 같은 나의 생각이다.

해결 방법

정말 간단하다. 위에서 봤을때 문제점이 무엇이었는가?

바로 offset을 무조건 전진시켜주면 끝난다!!

@Slf4j
@Service
public class PushService {

    public void sendPush(final MessageReq req, Acknowledgment acknowledgment) {
        try {
            //푸시로직이 작성되어있다...
            log.info("push Success : {}", req);
        } catch (SendFailureException e) {
            log.error("push failure : {}", e.getMessage());
            //DLT 처리
        } catch (Exception e) {
            log.error("알 수 없는 오류 발생 : {}", e.getMessage());
        }finally {
            acknowledgment.acknowledge();  // 에러지점
        }
    }
}

이렇게 무조건 commit 처리하게 해주고 저런 예외가 터졌을 경우에는 DLT (Dead Letter Topic)에 적재해주고 후속 처리를 진행하면 되겠다.

정리

무턱대고 구현할 때 좀 더 꼼꼼하게 작성하고 좀 더 테스트를 빡세게 하고 테스트 코드도 좀 더 케이스를 많이 짜려고 노력을 좀 해야겠다.

그래도 이걸 2주동안 붙잡고 있던게 결국 이런 오류들은 어려운 오류가 아닌데에서 나오는 기간이다.

아무튼 기초를 좀 더 탄탄히 하고 항상 꼼꼼하게 하자 🔥🔥🔥🔥🔥

728x90

'Spring > Kafka' 카테고리의 다른 글

Spring Kafka 좀 더 공통 설정하기  (0) 2022.10.06
Spring Kafka Deserializer Class Not Found Exception  (2) 2022.09.16
728x90

일전에 카프카 에러 포스팅 에서

Configuration 설정을 자바 클래스에서 해주었다. 그것도 클래스별로!!

에러 포스팅과 더불어

This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer

이런 문구도 출력해줬었다.

근데 저 에러 포스팅을 보면서 좀 더 공통화할 수 없을까에서 찾아보다가 ErrorHandlingDeserializer 관련 검색을 해보다가 문서에서 찾게 되었던게 있는데,

기존의 설정을 이미지로 한번 가져와봤다.

그런데 이 방법을 Listener가 늘어나면 늘어날 수록 고수할 수가 없다는 생각이 들었다.

세부적인 사항 외에는 조금 다 yaml로 공통화를 할 수 있지 않을까?? 나는 SpringBoot를 사용하는데!

그래서 문서를 찾아본 결과

yaml 설정

spring:
  kafka:
    consumer:
      group-id: test
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            use:
              type:
                headers: false
            trusted:
              packages: '*'
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    listener:
      ack-mode: manual_immediate

    retry:
      topic:
        attempts: 2

일단 헤더로서 값을 받지 않겠다 라는것을 선언해주고

문서처럼
spring.kafka.consumer.properties.spring.json.type.mapping=메시지를 수신할 dto 풀경로

로 전역에서 여러개를 설정해주어도 되지만, 이 부분은 리스너마다 다를것이라고 생각해서 @KafkaListener마다 설정해주기로 했다.

@KafkaListener(properties={
    "max.poll.records=15", // 한 작업에 15개의 레코드를 컨슘한다.
    "spring.kafka.consumer.properties.spring.json.type.mapping=com.github.lsj8367.MessageReq"
})
public void consumeMessage(final MessageReq request, Acknowledgement ack) {
    // 메세지 처리
}

이렇게 완성하여 ConsumerConfiguration을 제거하게 되었다.

나는 Spring을 쓰는 것이 아니라 이 설정을 한번 더 추상화하여 간편하게 쓰게 해주는 Boot를 쓰게해주는데

너무 Spring처럼 쓰는게 아닌가 싶었는데, 원하는대로 잘 바꾸어 준 것 같다.

나머지로 더 전역적으로 쓸 수 있는것은 공통으로 충분히 더 빼주고 특이한 케이스만이 별도의 Configuration 클래스로 빠져서 설정해주어야 하지 않을까 싶다.

아직 해결하지 못한 이슈가 있는데, 특정 Topic만 특이하게 한건을 consume하고 나서부터는 consume을 더이상 하지 않는 오류가 있다.

이 부분을 얼른 해결하고 싶다.

728x90

'Spring > Kafka' 카테고리의 다른 글

Kafka Offset Commit의 중요성  (0) 2022.10.06
Spring Kafka Deserializer Class Not Found Exception  (2) 2022.09.16

+ Recent posts