@Async 사용시 에러 해결
오랜만에 포스팅하는데 회사에서 그동안 앱 2.0 버전을 출시한다고 이래저래 바빴던 나날을 보냈다.
결과적으로는 만족스러운 출시..? 였던것 같다 ㅋㅋㅋ
버그도 많았고, QA 엔지니어께서 고생을 많이 하셨을 수도 있고 내가 구현한 메시지 플랫폼도 테스트하기가 정말 까다로웠다.
각설하고..
해당 에러사항을 구현하는건 Kafka를 이용하지 않아도 되기 때문에 RestAPI로 구현했다. (+ 테스트코드로만)
모든 코드는 깃허브에 있다.
이번엔 무슨 버그였냐?
이런 에러가 쏟아져나왔다. 기존 레거시 푸시는 NHN Toast를 이용한 푸시서비스로 구성되어 있었다.
변경한다고 해도 과도기가 존재하기 때문에 바로 지울수는 없고 이전 앱을 사용하는 사용자들에게는 해당 푸시로 알림은 계속 받아야되기 때문이다.
왜 났던 에러였는지 더듬어봤더니
Async 설정
기본적으로 @Async
를 사용하기 위해서는 Configuration 클래스를 하나 만들어주고 비동기 실행기에 대한 설정을 해주어야 한다.
이런식으로 에러를 일부러 내기 위해서 큐 사이즈를 5개 기본 스레드 1, 최대 스레드 2, 그리고 비동기로 처리할 큐 사이즈를 5로 구성했다.
문제상황 1
nhn 푸시를 보내기 위해선 Kafka에 Producer를 통해 메세지를 넣어주면, 내가 구현한 메시지 플랫폼에서 해당 메시지를 Consume하여 동작하는 방식으로 구성되어있다.
그런데 나는 비동기로 구현하겠다고만 생각하고 사이즈를 제한해두지는 않았었다.
그래서 Consume은 계속해서 KafkaListener를 통해 무제한으로 하고 있는 와중에 이 컨슘 속도가 너무 빠르니까
위에서 설정한 비동기 처리 큐 사이즈를 200으로 잡아놓아도 예를 들어 1000건이 한방에 producer를 통해 적재된 후 바로 consume을 해버리니까
100개 큐를 훌쩍 넘어버려서도 막는게 아니라 지속적 컨슘이 일어나기에 큐 사이즈를 넘어서는 순간에도 이 메소드 실행을 밀어 넣는 오류였다.
그래서 맨위의 이미지처럼 Exception이 발생하게 되는 것이다.
에러 직접 구현
정확한 코드는 깃허브에서 보면 되지만 여기서는 간략하게 대충 구현하도록 하겠다.
JDK 17버전이기 때문에 record class를 사용함. (record를 사용하면 클래스는 final로 선언되며 equals & hashCode를 재정의하여 갖고있음)
/hello 를 호출하면 printAsync라는 비동기 메소드를 100번호출하게끔 만드는 로직이 있다.
HelloController.java
@RestController
public record HelloController(HelloService helloService) {
@GetMapping("/hello")
public ResponseEntity<?> hello() {
int i = 0;
while (true) {
helloService.printAsync();
i++;
if (i == 100) {
break;
}
}
return ResponseEntity.ok(Map.of("data", "success"));
}
}
HelloService.java
@Slf4j
@Service
public class HelloService {
@Async("threadPoolTaskExecutor")
public void printAsync() {
try {
Thread.sleep(5000);
log.info(Thread.currentThread().getName());
log.info("hello service print!!!");
} catch (InterruptedException e) {
log.error("error : {}", e.getMessage());
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
printAsync()는 비동기로 호출하며 5초를 기다리고 log를 찍게 되는데, 우리가 처음 전역적으로 설정해준 스레드 설정은
요청이 들어오면 maxQueue 사이즈인 5까지만 담을 수 있게 되어있고 작업이 느려서 큐 데이터를 하나 소진하지 못한다면 에러가 발생하는데
그 에러는 이렇다.
이미지로 한방에 찍어지지 않아 글자로 첨부한다.
Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@265c1a7c[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$967/0x00000008010a7dc0@740a0d5e
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:391)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:292)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at com.github.lsj8367.application.HelloService$$EnhancerBySpringCGLIB$$7321912.printAsync(<generated>)
at com.github.lsj8367.presentation.HelloController.hello(HelloController.java:17)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1070)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
... 87 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@29013ef2[Not completed, task = org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$967/0x00000008010a7dc0@740a0d5e] rejected from java.util.concurrent.ThreadPoolExecutor@265c1a7c[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:145)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:388)
... 107 more
처음 봤던 이미지와 같은 예외가 발생하는걸 볼 수 있다.
그러니까 큐 공간이 가득차기 전에 계속해서 비동기를 호출하며 작업큐에 메소드를 태우려고 하다보니 이런 예외가 발생한 것이다.
해결은?
그래서 나는 일단 푸시 발송이 그렇게 오래걸리는 로직이 아니라서 일단은 동기 상태로 바꿔놓았지만,
지속적인 모니터링을 하여 트래픽이 많아지게 되면 그때는 batchListener를 적용하여 비동기로 여러개를 한방에 처리하는 방법을 고려중이다.
-> 이 부분은 카프카의 리밸런싱을 고려해야한다.
아무튼 내가 생각했던 여러 추측중에 하나가 걸리게 되니까 좀 재밌었던 것 같다.
그래도 오늘도 해결했다!