728x90

예제는 깃허브에 있다.

트랜잭션을 공부했다고 생각하고 업무에 임했던 나였는데, 도저히 풀리지않는 느낌으로 예외를 받았던게 있다.
이전에 한번 포스팅했던 Stream Closed 에러였는데 이게 또 한번 나를 붙잡았다.
처음 개발환경에서 OpenFeign을 사용하면서 또 logging level에서 IOException이 나는줄알고 이부분으로 삽질을 했다.

근데 그게 아니어서 이 포스팅을 작성했다.

  • HelloController.java
@RestController
@RequiredArgsConstructor
public class TestController {

    private final TransactionParentService transactionParentService;

    @GetMapping("/transaction/test")
    public ResponseEntity<?> test() {
        transactionParentService.size();
        return ResponseEntity.ok().build();
    }
}
  • TransactionParentService.java
@Service
@RequiredArgsConstructor
public class TransactionParentService {

    private final MemberService memberService;
    private final TransactionErrorService transactionErrorService;

    @Transactional
    public int size() {
        transactionErrorService.throwExceptionLog();
        return memberService.allSize();
    }
}
  • TransactionErrorService.java
@Slf4j
@Service
@RequiredArgsConstructor
public class TransactionErrorService {

    private final MemberService memberService;

    @Transactional
    public void throwExceptionLog() {
        try {
            memberService.saveAndException();
        } catch (RuntimeException e) {
            log.error("error : {}", e.getMessage());
        }
        System.out.println("끝");
    }
}
  • MemberService.java
@Service
@Transactional
@RequiredArgsConstructor
public class MemberService {

    private final MemberRepository memberRepository;

    public void save() {
        memberRepository.save(new Member(1L, "홍길동"));
    }

    @Transactional
    public void saveAndException() {
        memberRepository.save(new Member(null, "홍길동"));
        throw new RuntimeException();
    }

    public int allSize() {
        return memberRepository.findAll().size();
    }

}

TransactionParentService -> TransactionErrorService -> MemberService 순서로 서비스는 동작하게 되고,
기본적으로 Service에는 메소드마다 @Transactional 어노테이션이 붙어있다.

아래는 application.yaml에

logging:
  level:
    org.springframework.transaction: trace

이와 같은 로그설정을 해주고 찍어본 결과이다.

TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.github.lsj8367.service.TransactionParentService.size]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.github.lsj8367.service.TransactionErrorService.throwExceptionLog]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.github.lsj8367.service.MemberService.saveAndException]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]

TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.github.lsj8367.service.MemberService.saveAndException] after exception: java.lang.RuntimeException

TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.github.lsj8367.service.TransactionErrorService.throwExceptionLog]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.github.lsj8367.service.MemberService.allSize]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.findAll]

TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.findAll]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.github.lsj8367.service.MemberService.allSize]
TRACE 66119 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.github.lsj8367.service.TransactionParentService.size]

여기서 보면 정상적으로 트랜잭션을 처음엔 쭉 획득한다.
그리고나서 memberService.save()를 통해 SimpleJpaRepository까지 하기위한 transaction을 얻게된다.

에러가 나는 부분

우리의 코드에서는 save 이후 throw new RuntimeException()이 있다.
여기서 부터 after Exception : java.lang ...으로 보이는 rollback 마킹이 존재한다.

org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:752) ~[spring-tx-5.3.22.jar:5.3.22]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711) ~[spring-tx-5.3.22.jar:5.3.22]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654) ~[spring-tx-5.3.22.jar:5.3.22]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407) ~[spring-tx-5.3.22.jar:5.3.22]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.22.jar:5.3.22]
    ...생략
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

이게 도대체 어떻게 동작하는것인가?
추상클래스인 TransactionAspectSupport.invokeWithinTransaction 메소드를 사용하는 TransactionInterceptor의 invoke를 호출해서 안쪽에 추상클래스의 메소드를 사용한다.

스크린샷 2023-02-27 오후 10 37 23

해당 @Transactional이 붙은 메소드에서 던져진 exception을 Throwable 객체로 catch절에서 받고있다.

스크린샷 2023-02-27 오후 10 40 03

  • DefaultTransactionAttribute

스크린샷 2023-02-27 오후 10 43 31

  • rollbackOn 메소드

스크린샷 2023-02-27 오후 11 04 58

이 스프링 기본 구성인 DefaultTransactionAttribute구현에서 rollbackOn 메소드를 사용하게 되는데
이 rollbackOn에서 그토록 얘기했던 말들이 나온다.

RuntimeException이 여기서 채택되어 instanceof로 체크하고 있다!!!

그래서 rollback 마크를 하나를 진행해두고 나머지 트랜잭션 어노테이션에 대해서도 commit을 할거냐 rollback을 할거냐에 대한 작업을 이 해당 aop를 통해 동작한다.

TransactionManager

각 트랜잭션 매니저들은 AbstractPlatformTransactionManager 추상클래스를 상속하여 사용한다.

스크린샷 2023-02-27 오후 10 53 18

그래서 트랜잭션 매니저가 전부 commit을 수행하는데,
rollback 전용 마크가 하나라도 붙게되면 UnexpectedRollbackException 이 에러가 나타나게 된다.
그래서 보이게되는 예외 메시지가 Transaction silently rolled back because it has been marked as rollback-only로 나오게 된다.

그럼 Stream Closed 는?

ㅋㅋㅋㅋ.. 그러게 말이다. 이거 왜뜬건지 도저히 이해가 되지 않는데, dev환경에선 이 로그만 보였지 위의 로그가 안보였었다.
근데 로컬에서 이 부분을 수정하니 말끔히 해결됐다.
아무튼 원초적인 문제는 이 무지성 붙이기 @Transactional이었다.
막판에 이게 떠올라서 지웠더니 해결되서 꿀잠각이다.

아무튼 @Transactional에 대한 디버깅 정리를 해본다.

728x90

'디버깅' 카테고리의 다른 글

Kafka가 내 로직을 9번이나 재시도를 했다  (0) 2023.11.02
Stream 오류 제거  (2) 2023.04.21
FeignClient Logging level 디버깅  (0) 2022.12.17
@Async 사용시 에러 해결  (0) 2022.11.04
728x90

회사의 서비스들이 여러개로 쪼개져있다.

그래서 우리는 주로 FeignClient를 사용하는데, 애를 먹었던 로깅레벨에 대해 포스팅한다.

업무중에 삽질을 진행했었으며, 해당 내용으로 자바스럽게 고쳤던 경험을 좀 풀어본다..

 

스프링 프레임워크를 사용하고 있기에 여기서 같이 제공해주는 Spring Cloud의 OpenFeign을 사용하였다.

 

아래는 사용하는 예시이다.

출처 -&nbsp;https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/

 

해당 예시처럼 나는 FeignClient를 구현했었다.

물론 @EnableFeignClients 는 별도의 Configuration 클래스 파일에 설정을 해주었었다!

 

여기까지는 일단 기본적인 설정이지만, 아래의 레벨 설명이 진짜다.

 

Feing Logging Level은 총 4단계로 이루어져있다.

  • NONE:  로깅 없음(Default)
  • BASIC:  요청 방법 및 URL, 응답 상태 코드 및 실행 시간 기록
  • HEADERS:  요청 및 응답 헤더와 함께 기본 정보를 기록
  • FULL : 요청과 응답 모두에 대한 헤더, 본문 및 메타데이터를 기록

기본적으로 BASIC, HEADERS에는 정상작동을 했던 우리의 소스였다.

 

일단 소스가 어떤 구성이냐면...

 

회사의 정보를 많이 노출할 수는 없기에 간략하게 설명한다.

 

일명 스크래핑이라고 하는 기술, 흔히 크롤링이라고도 할 수 있을것이다.

해당 작업을 외부 API로 연동하여 응답값을 받아 파싱해주는 작업이 있다.

근데 우리의 규칙상 해당 스크래핑의 원문 데이터라는 아예 크롤링의 전체에 해당하는 xml양식의 데이터도 같이 받아오는 방식을 정책으로 정했기 때문에 우리는 Google Cloud 를 사용하기에 Google Cloud Storage에 원문 데이터를 저장하고, 나머지 데이터를 데이터베이스에 저장하는 구조로 구성이 되어있다.

 

여기서 문제가 발생한다.

 

일단 원문 데이터가 포함이되기에 응답값의 길이가 상당히 길다.

길지않고 정상적인 응답을 받는 스크래핑 로직의 경우에는 이런 디버깅이 해당되지 않는다.

 

근데 글자가 길었고, 내가 핸드폰에서 슬랙으로 알림을 받았던 에러메시지는 다음과 같았다.

에러 로그.... 정보유출되는 것이 너무 많아 다 지워버렸다 ㅋㅋㅋ...

아무튼 이러한 예외가 발생했다.

에러 메시지를 보면 해당 응답값의 타입이 application/json이 아니라는데

디버깅을 찍어보면 정상 json응답값이 파싱이 되고 있었다.

 

아래부분이 좀 더 의심스러웠었는데, buffer Length : 8192 

그러니까... 버퍼길이가 8192 제한인데 길이가 더 커서 담기지 않아서 에러가 발생한다는 것이다.

 

이 부분이 왜 나왔는지를 보니까

 

해당 스크래핑 작업을 하는 로직에서 우리는 로그를 찍는다.

 

스크래핑 작업 전후로 시작했고 완료되었다는 외부의 로직 실행결과를 로그로 담고 있다.

 

그런데 이 담는 로직에서 예외를 이렇게 뱉는다는 것이다.

 

feign 의 기본 클라이언트는 어떤것을 쓰냐면 바로 ApacheHttpClient를 사용한다.

 

아래는 구현체의 내용 일부분이며, 빨간 네모의 asInputStream으로 body를 출력하고 있다.

 

httpResponse에서 받는 해당 entity의 content를 찍어내는 것인데, 이 부분에서 에러가 발생한다.

기본 maxContentLength 가 8192이기 때문에 정상적으로 스트림 반환을 하지 않고 저렇게 예외를 띄우게 되는 것이다!

이부분이 TCP 연결에서의 최대 길이이기 때문에 HttpClient를 사용하면 이런 예외가 발생된다.

 

아래는 ByteArrayBody이다.

해당 부분에서는 ByteArrayInputStream을 사용하기 때문에 TCP length 제한보다 더 쓸수 있는 것 같다.

이것이 바로 ByteArrayInputStream....?

아무든 이 두 asInputStream()을 보는 이유는 아래에서 설명하겠다.

아무튼 이 ByteArrayInputStream은 tcp max content length와 관계없이 스트림을 출력하기에 길이제한이 없다.

 

이 두개의 차이를 가르는 것은 Log 였다!!!

 

그래서 소스를 파고들었다.

 

해당 로그 부분이 BASIC레벨 보다 수준이 낮은 경우 수행되는 로직인데, 

그래서 FULL로직에서만 정상적으로 데이터 파싱이 된다.

 

왜? - response.toBuilder().body(bodyData).build() 를 통해 새로운 ByteArrayInputStream으로 생성되기 때문이다.

 

그래서 이 부분을 재정의를 해주어 BASIC 레벨에서도 길이가 아무리 길더라도 데이터를 파싱할 수 있게 설정해주었다.

 

로직이 정상적으로 수행되며 아래 로그부분에서 객체를 다시 만들어주게 되는걸 보고나서 넘 행복했다는것...😇😇😇

 

해결방안

해당 Feign의 Logger를 상속받아서 log찍는 부분을 override 시켜주었다.

그래서 기존 로직을 또 건드리고 싶지는 않았다. <- 로그는 이대로 찍어주는게 마음 편했으니까?

그래서 분기처리 하는 부분을 제거하고 정말로 필요없는 부분만을 제거해주고 나머지 로직은 유지했다.

 

아무튼 데이터가 길었기 때문에 로그레벨에 따라 값이 파싱이 되고 안되고 났던게 좀 신기했다.

길이가 짧은 응답을 사용했다면 오히려 이런것도 모르고 그냥 설정따라 파싱해주고 안해주고로 넘어갔을 것 같다!!!!

728x90

'디버깅' 카테고리의 다른 글

Stream 오류 제거  (2) 2023.04.21
@Transactional 제대로 알고쓰기  (4) 2023.02.27
@Async 사용시 에러 해결  (0) 2022.11.04
Jenkins 에러  (0) 2022.08.11
728x90

오랜만에 포스팅하는데 회사에서 그동안 앱 2.0 버전을 출시한다고 이래저래 바빴던 나날을 보냈다.

결과적으로는 만족스러운 출시..? 였던것 같다 ㅋㅋㅋ

버그도 많았고, QA 엔지니어께서 고생을 많이 하셨을 수도 있고 내가 구현한 메시지 플랫폼도 테스트하기가 정말 까다로웠다.

각설하고..

해당 에러사항을 구현하는건 Kafka를 이용하지 않아도 되기 때문에 RestAPI로 구현했다. (+ 테스트코드로만)

모든 코드는 깃허브에 있다.

이번엔 무슨 버그였냐?

이런 에러가 쏟아져나왔다. 기존 레거시 푸시는 NHN Toast를 이용한 푸시서비스로 구성되어 있었다.

변경한다고 해도 과도기가 존재하기 때문에 바로 지울수는 없고 이전 앱을 사용하는 사용자들에게는 해당 푸시로 알림은 계속 받아야되기 때문이다.

왜 났던 에러였는지 더듬어봤더니

Async 설정

기본적으로 @Async 를 사용하기 위해서는 Configuration 클래스를 하나 만들어주고 비동기 실행기에 대한 설정을 해주어야 한다.

스크린샷 2022-11-04 오후 11 40 12

이런식으로 에러를 일부러 내기 위해서 큐 사이즈를 5개 기본 스레드 1, 최대 스레드 2, 그리고 비동기로 처리할 큐 사이즈를 5로 구성했다.

문제상황 1

nhn 푸시를 보내기 위해선 Kafka에 Producer를 통해 메세지를 넣어주면, 내가 구현한 메시지 플랫폼에서 해당 메시지를 Consume하여 동작하는 방식으로 구성되어있다.

그런데 나는 비동기로 구현하겠다고만 생각하고 사이즈를 제한해두지는 않았었다.

그래서 Consume은 계속해서 KafkaListener를 통해 무제한으로 하고 있는 와중에 이 컨슘 속도가 너무 빠르니까

위에서 설정한 비동기 처리 큐 사이즈를 200으로 잡아놓아도 예를 들어 1000건이 한방에 producer를 통해 적재된 후 바로 consume을 해버리니까

100개 큐를 훌쩍 넘어버려서도 막는게 아니라 지속적 컨슘이 일어나기에 큐 사이즈를 넘어서는 순간에도 이 메소드 실행을 밀어 넣는 오류였다.

그래서 맨위의 이미지처럼 Exception이 발생하게 되는 것이다.

에러 직접 구현

정확한 코드는 깃허브에서 보면 되지만 여기서는 간략하게 대충 구현하도록 하겠다.

JDK 17버전이기 때문에 record class를 사용함. (record를 사용하면 클래스는 final로 선언되며 equals & hashCode를 재정의하여 갖고있음)

/hello 를 호출하면 printAsync라는 비동기 메소드를 100번호출하게끔 만드는 로직이 있다.

HelloController.java

@RestController
public record HelloController(HelloService helloService) {

    @GetMapping("/hello")
    public ResponseEntity<?> hello() {

        int i = 0;
        while (true) {
            helloService.printAsync();
            i++;

            if (i == 100) {
                break;
            }
        }

        return ResponseEntity.ok(Map.of("data", "success"));
    }
}

HelloService.java

@Slf4j
@Service
public class HelloService {

    @Async("threadPoolTaskExecutor")
    public void printAsync() {
        try {
            Thread.sleep(5000);
            log.info(Thread.currentThread().getName());
            log.info("hello service print!!!");
        } catch (InterruptedException e) {
            log.error("error : {}", e.getMessage());
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

}

printAsync()는 비동기로 호출하며 5초를 기다리고 log를 찍게 되는데, 우리가 처음 전역적으로 설정해준 스레드 설정은

요청이 들어오면 maxQueue 사이즈인 5까지만 담을 수 있게 되어있고 작업이 느려서 큐 데이터를 하나 소진하지 못한다면 에러가 발생하는데

그 에러는 이렇다.

이미지로 한방에 찍어지지 않아 글자로 첨부한다.

Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@265c1a7c[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$967/0x00000008010a7dc0@740a0d5e
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:391)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:292)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
    at com.github.lsj8367.application.HelloService$$EnhancerBySpringCGLIB$$7321912.printAsync(<generated>)
    at com.github.lsj8367.presentation.HelloController.hello(HelloController.java:17)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1070)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    ... 87 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@29013ef2[Not completed, task = org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$967/0x00000008010a7dc0@740a0d5e] rejected from java.util.concurrent.ThreadPoolExecutor@265c1a7c[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:388)
    ... 107 more

처음 봤던 이미지와 같은 예외가 발생하는걸 볼 수 있다.

그러니까 큐 공간이 가득차기 전에 계속해서 비동기를 호출하며 작업큐에 메소드를 태우려고 하다보니 이런 예외가 발생한 것이다.

해결은?

그래서 나는 일단 푸시 발송이 그렇게 오래걸리는 로직이 아니라서 일단은 동기 상태로 바꿔놓았지만,

지속적인 모니터링을 하여 트래픽이 많아지게 되면 그때는 batchListener를 적용하여 비동기로 여러개를 한방에 처리하는 방법을 고려중이다.

-> 이 부분은 카프카의 리밸런싱을 고려해야한다.

아무튼 내가 생각했던 여러 추측중에 하나가 걸리게 되니까 좀 재밌었던 것 같다.

그래도 오늘도 해결했다!

728x90

'디버깅' 카테고리의 다른 글

@Transactional 제대로 알고쓰기  (4) 2023.02.27
FeignClient Logging level 디버깅  (0) 2022.12.17
Jenkins 에러  (0) 2022.08.11
AWS SNS 토큰 에러  (0) 2022.08.10
728x90

현재 회사에서 spring-kafka 를 이용해서 특정 서비스들의 푸시 메세지 이벤트를 받아서

 

전송해주는 서버를 구현하고 있다. (오늘 쿠버네티스에 배포까지 했다!! 모르는게 너무많은...)

 

로컬에서 내 맘대로 메세지 토픽을 발행해서 쏴도 잘 맞게 역직렬화를 수행을 해주길래 그냥 그런가보다.

 

하고 잘 넘어갔던 찰나에!!!

Class Not Found Exception

왜? 이 에러가 났을까?

일단 나는 구독하는쪽 그러니까 Kafka에서는 Consumer 쪽 만을 구현해주었다.

 

내 지식이 부족했던 탓인지는 모르겠지만, 어쨌든 같은 JSON 형태라고 생각해서 클래스가 무엇이던 간에

 

JSON형식만 같다면 Consume해도 괜찮을 것이라고 처음 생각했었다.

 

그래서 발행 모델인 Producer쪽에서는 예를 들면 MessageReq로 Producer가 보내고

 

받은 부분인 Consumer에서는 PushReq라고 받는다고 하고 데이터는 둘다 똑같은 형식으로 매칭이 되어있다고 가정한다.

 

이랬을 때 Consumer 서버를 키면???!!!

 

바로 Class Not Found Exception이 떠버린다.

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.github.lsj8367.message.PushReq]; nested exception is java.lang.ClassNotFoundException: com.github.lsj8367.message.MessageReq
  at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138)
  at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99)
  at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:342)
  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030)
  at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
  at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250)
  at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)
  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)
  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)
  at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:741)
  at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.lang.Thread.run(Thread.java:748)

예제코드는 깃허브에 있지만, producer와 consumer는 각기 다른 서버로 만들어서 진행시켜주어야 한다!! 안그러면 에러 안나고 받지도 않는다.

왜? Consumer쪽엔 직렬화한 MessageReq클래스가 없기 때문이다.

그래서 보내는 쪽에서 미리 받는 Consumer쪽의 객체 형식을 Header에 바인딩 해주거나,

받는 입장에서 Header가 아닌 Method로 받게 설정해줄 수 있다.

해결책

그래서 해결책이 무엇이냐!

위에서 잠깐 말했지만 producer에서 header를 설정해주거나 consumer에서 설정을 해주는 방법이 있다고 했다.

물론 서로 정의가 잘 되어있고, 동시에 구현했다면 나는 전자의 방법을 택했을 것이다.

그렇지만, 각자 개발하는 속도가 있었고, 또 다른 작업들을 계속해서 진행해야 하고 이미 돌아가고 있던 배치서버에서 그걸 바꿔서

다시 배포하기엔 쉽지 않았었다. (사실 말씀드리기도 좀 그랬다.)

디버깅

그래서

맨끝부분에 false를 넣어주었는데, 이는 들어가보면...

생성자를 오버로딩하기에 쭉 들어오니 이 생성자가 나온다.

initialize에서 해당 boolean값을 사용하고 있기 때문에

boolean값에 의해서 어떤 타입으로 결정하는지가 나온다.

그 설명은

위에서 앞서 설명했던 것과 똑같은 설명이 자바독으로 쓰여져있다.

공부해야될거 참 많다...

정리

그래도 내가 혼자 힘으로 이 메세지 서버를 만들면서 정말 재밌게 개발했다.

 

물론 오늘 배포하는데 쿠버네티스 지식이 없어서 애를 먹었다. ~~ㅋㅋㅋㅋㅋㅋ~~

 

그래도 백엔드 개발자분들이 도와주셔서 꿀 플러그인도 전수받고 기본 개념을 정리해가는 것 같다.

 

뭐하나 건들면 바로 공부해야되는게 너무 어렵지만 재밌어서 좋다.

 

조만간 카프카 기본개념과 쿠버네티스도 정리해야겠다 🔥🔥🔥🔥🔥

728x90

'Spring > Kafka' 카테고리의 다른 글

Kafka Offset Commit의 중요성  (0) 2022.10.06
Spring Kafka 좀 더 공통 설정하기  (0) 2022.10.06
728x90

ModelAttribute와 RequestBody의 커맨드 객체 파싱이 다른것을 확인했다.
한번 알아보자! ModelAttribute 동작과정은 덤이다.

ModelAttribute

이 포스팅을 하는 이유는 인자가 많을 경우에 post방식으로 조회를 하는 식으로 구성을 했었는데,

코드리뷰중에 이런말이 나왔었다.

get방식으로 다른 객체로 묶어서 한번에 받아보는건 어떤가요?

변수가 많아지면 많아질 수록 수정점이 늘어날것 같아요! 라고 받았다.

그래서 무의식적으로 평소에 하던방식처럼 post로 수정하여 커밋하고 수정했었다.

근데 post로 안바꾸고 get에서 @ModelAttribute 사용하면 객체로 파싱이 된다는것을 듣고 내가 부족했구나 싶었다.

이 글은 그 부분에서 나와 집에와서 따로 정리하여 포스팅한다.

코드는 깃허브에 있다.

컨트롤러와 dto 그리고 컨트롤러 테스트 코드를 간략하게 작성했다.

Debugging

그리고 디버깅을 돌리면 이런 순서로 진행이 된다.

무조건 DisPatcherServlet이 모든 작업을 분산하여 처리 위임을 진행해주는데,

여기서의 핵심은 이 아랫부분이다.

RequestMappingHandlerAdapter

RequestMappingHandlerAdapter로 시작

핸들러 메소드를 처리할 수 있는 ArgumentResolver 들과
returnValueHandler들을 넣어준다.

이렇게 넣어준다!

ServletInvocableHandlerMethod

다음은

ServletInvocableHandlerMethod가 returnValue를 할 수 있는 애로 채택되는데

InvocableHandlerMethod

InvocableHandlerMethod를 통해 HandlerMethodArgumentResolver를 찾는다.

쭉 반복해서 알맞는 것을 탐색중....

완료되면? 여기서 찾게되는 인스턴스가 바로 ModelAttributeMethodProcessor

ModelAttributeMethodProcessor가 처리해주게끔 반환해준다.

ModelAttributeMethodProcessor

ModelAttribute어노테이션을 읽어들이고

ServletModelAttributeMethodProcessor가
어노테이션이 붙은 변수의 타입이 url 변수와 일치하는 속성이 있는지

이렇게 찾는다.

찾고 없으면 null을 반환하여 객체 생성부분으로 간주하고

같은 객체내에 오버로딩된 메소드에서 리플렉션을 이용해서
해당 ModelAttribute 어노테이션이 붙은 객체의 타입을 가져온다.

BeanUtils 클래스를 이용하여
이때 이 타입의 생성자를 찾는데
제일 먼저 기본생성자를 찾고 있다면 기본생성자를 반환하고 아니라면 구현된 생성자를 전부 가져오는데
이때 길이가 1개이면 해당 생성자를 생성해준다.

이후에 ModelAttributeMethodProcessor에서 파라미터와 매칭되는 것을 넣어준다.

커맨드 객체의 차이?

개인적으로 생각했을 때 @ModelAttribute@RequestBody를 읽는 커맨드 객체의 차이점은

RequestBody는 Jackson 라이브러리를 통해서 읽어와 ObjectMapper가 사용되서 조금 생성부분에서 차이가 나지않나 생각한다.

@RequestBody

전에 RequestBody 동작이 어떻게 되는지를 보며 작성한 메세지 컨버터 정리

중간 부분에 보면 해당 컨버팅 작업을 MappingJackson2HttpMessageConverter가 수행해주는데

read() 메소드에서 ObjectMapper를 사용하여 매핑해준다.

그래서 @RequestBody는 getter메소드와 기본생성자가 있어도 ObjectMapper가 해주기에 setter를 쓰거나 해주지 않아도 주입된다.

@ModelAttribute

ModelAttribute는 이와는 다르게 커스텀하게 어노테이션을 만들어서 우리가 MethodArgumentResolver를 구현해서 파라미터에 값을 넣어주듯,

어노테이션 + 리플렉션을 이용해서 값을 넣어주는 것이다. 물론 ServletRequest에서 읽어와서 값들을 가지고 있는건 둘다 공통이지만 말이다.

그래서 기본생성자와 getter만 있으면 값을 넣어줄 수가 없기에

기본생성자에 setter를 전부 생성해주거나 전체 필드를 할당할 수 있는 생성자를 만들어 주어야 한다!!

728x90

'Spring' 카테고리의 다른 글

Slack Slash Commands(슬랙 슬래시 커맨드) 사용하기  (1) 2023.07.06
분산 락  (0) 2023.04.15
@Valid, @Validated 차이  (0) 2022.08.10
AOP  (0) 2022.08.10
728x90

업무에서 Spring Batch로 세미나를 진행하고, 앱 푸시 기능을 배치로 전환하는 작업을 진행했다.
여기에 저장하면서 글로만 보던 것들을 직접 경험해보면서 겪었던 일들을 기록하려고한다.

첫번째 에러

우리 푸시 배치 서버의 구조는 스프링 스케줄러 서버에서
푸시 서버의 api를 호출해서 해당 job들을 돌려주는 방식으로 구성이 되어있다.

물론 이 부분을 새롭게 개편해야 하는것은 맞다ㅋㅋㅋ

그래서 특정 시간이 되면 해당 job api로 호출을 하는데
여기서 대략 총 데이터가 100,000건 정도 되는데 전부 동기 + 블록킹처리로 진행했다.
그래서 스케줄러가 api를 쏘고 요청값이 최대 오래걸려도 limit을 30분을 잡았었다.
그런데 100,000건의 데이터를 여러 로그를 쌓고, 푸시를 하는데까지 1시간이 넘게 걸렸었다.
그래서 1시간이 지나도 받지 못하는 상황에 에러가 나서 해당 스케줄러가 돌다가 실패가 되었다. (근데 뒤에서의 푸시 서버는 계속 돌고있었다)

그러니 다시 정리해보면 1시간이 넘는 시간동안 작업을 하고 있던것이다.

-> 이부분에서 나도 대기중인 데이터만 뽑는 쿼리를 작성해야했는데 실수로 보낸 데이터까지 조회하게끔 만들었다.

Mono 객체를 block()을 사용해서 푸시를 진행했기 때문에, 블록킹 방식으로 동작하여

제어권도 아예 넘겨버려서 끝이나야 다음 작업을 수행하는 형태로 진행되서 굉장히 느렸었는데,

이방식을 subscribe() 방식으로 바꿔서 논블록킹으로 푸시 발송 명령만주고 다음 작업을 진행하게끔 해서

많이 속도를 줄일 수 있었다.
-> 이부분은 조만간 다른 포스팅에서 자세하게 다룰 예정이다.

찾은 부분

SEND, WAIT 두 상태의 데이터를 모두 가져오고 나서 SEND로 업데이트를 치고 있던 것이다.

이부분이 일단 성능 저하의 첫번째 원인이라 생각했고,
그리고 스케줄러는 다른 스케줄링들도 가지고 있으니, 푸시 서버에서 요청을 바로 돌려주고 해당 Job은 뒷편에서 실행해주는 것이 맞다고 생각했다.
그래서 이 두부분을 고쳐보았다.

  • 쿼리는 WAIT상태만 추출하여 갖고 있는다.
  • 푸시서버는 기본적으로 제공해주는 JobLauncher를 배제한다.
@RestController
@RequiredArgsConstructor
public class Demo {
    private final BasicBatchConfigurer basicBatchConfigurer;

    @PostMapping("/demo/push")
    public String appPushJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = (SimpleJobLauncher) basicBatchConfigurer.getJobLauncher();
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());

        // jobLauncher 실행 로직....
    }
}

이렇게해서 요청을 바로 수행후 return값을 먼저 돌려주었다.
이렇게 해서 스케줄러의 동기처리로 늦어졌던 것에 대해서 일단락 짓게 되었고,
1시간 이내로 처리가 되게 되었다.

두번째 에러

두번째 에러는 페이징 처리에 대한 부분이었다.
참고 블로그 링크
예를 들어서, 총 10페이지로 구성되어있고, 1페이지당 10개의 데이터가 있다고 가정한다.
총 100개의 데이터가 업데이트가 되어야 한다.
근데 커밋이 한번 일어나게 되면,

1페이지 10 -> 2페이지 10 -> 3페이지 10 -> ... 이 될줄 알았었다.

애초에 도입하기 이전부터 해당 참고 블로그를 보면서 이런 에러가 있구나 하면서 개발을 진행했다.
인덱스 기준으로
1 ~ 10 번까지의 데이터가 작업 완료되면 해당 데이터들은 Update가 진행이 될 것이다.
그래서 11 ~ 20을 원하던 다음 데이터는 21 ~ 30을 조회하게 되는것.
이것은 배치의 문제가 아니라 그냥 페이징 쿼리 자체의 문제라고 한다.

그래서 유저들에게 푸시를 보낼때 50%의 유저만 푸시를 받았을 것이다.😇

이렇게 해서는 안됐다.
방법은 우선 2가지가 있었다.
그렇지만 나는 2번째 방법을 사용했다.

그렇다면 왜?

우리 회사의 배치 서버는 JPA로 구성하기로 했었고, 그래서 JPA를 사용한 배치 동작을 구현해서
Cursor대신 JpaPaging을 사용했다.

커서 사용

커서(Cursor)란??

쿼리문에 의해서 반환되는 결과값들을 저장하는 메모리공간
Fetch => 커서에서 원하는 결과값을 추출하는 것
커서는 한번 커넥션을 맺은 후 커서만을 다음으로 가기 때문에 조회하고 Update되어도 갱신되는 일이 없이 적용 가능하다.

@Bean
@StepScope
public JpaPagingItemReader<Pay> payPagingReader() {

    private final int chunkSize = 1000;

    JpaPagingItemReader<PushAlarm> reader = new JpaPagingItemReader<PushAlarm>() {
        @Override
        public int getPage() {
            return 0;
        }
    };

    reader.setQueryString("SELECT p FROM PushAlarm p WHERE p.sendStatus = :sendStatus");
    reader.setParameterValues(Map.of("sendStatus", "WAIT"));
    reader.setPageSize(chunkSize);
    reader.setEntityManagerFactory(entityManagerFactory);
    reader.setName("payPagingReader");

    return reader;
}

이런식으로 page를 0으로 고정시켜줘서 update가 일어나도 다시 0페이지만 계속 조회하는 것이다.
이렇게 해서 문제를 해결했다.

마무리

배치 세미나를 진행하면서 공식문서를 읽고 참고 블로그까지 더해서 학습해서
적용해본 결과 그래도 역시 글로 보는것보다 맞으면서 배우는게 좀 더 빠르게 습득이 가능하다는걸 느낀다.
지금은 데이터가 작아서 내가 일을 제대로 처리했을지 모르겠다.
그래서 강의를 하나 더 들으면서 좀더 뿌리를 깊게 내려야겠다...

728x90

+ Recent posts