본문 바로가기

Reactive stack

[Reactive Stack] webflux, R2DBC 환경에서 Event 사용하기

최근 webflux + R2DBC 스택으로 되어있는 프로젝트에서 스프링 Event를 사용할일이 있었습니다.
MVC + JPA 환경에서는 어렵지 않게 사용하던 이벤트를 webflux + R2DBC 환경에서 사용하려고 하니까, 많은 부분이 생각대로 동작하지 않더라구요. 
 
reactor는 lazy하게(최종 호출 메소드가 나오기전까지 실행되지 않음) 실행된다는 특징 때문에, Event 호출에서 고려해야 할 부분이 좀 있었습니다. R2DBC와 Webflux에 대한 이해가 있지 않으면 실수 할 수 있는 부분이 있다고 생각이 들어서, 케이스 별로 테스트를 해봤습니다.


시나리오 설명

시나리오는 간단한데, PayRollbackEvent가 발행되면, 저장된 payment를 찾아서 state를 PaymentState.FAILED 로 변경하면 됩니다.

아래 코드는 이번 아티클에서 사용할 테이블 입니다.
 

Payment


아래 코드는 시나리오를 구현한 테스트 입니다. 
테스트 코드에서 PayRollbackEvent를 발행하면, eventually 블록에서 10초(시간은 임의로 넉넉히 설정했습니다.)동안 state가 FAILED로 변경되었는지 확인합니다. 
 

시나리오

 
전체 테스트 코드는 링크에서 확인할 수 있습니다.


Case 1. Publisher를 subscribe()하지 않고, Publisher를 반환하지 않을때

첫번째 케이스는 PayRollbackEvent핸들러 메소드에서 subscribe 하지 않고, Publisher를 반환하지 않는 상황입니다.
구현은 아래코드와 같습니다.
 

PayService.kt

코드를 간략히 설명하자면,
우선 paymentRepository에서 저장된 pay의 id로 payment 엔티티를 찾습니다.
그 후, payment의 상태를 fail() 로 변경한다음, paymentRepository에 update 해줍니다.

Spring data R2DBC는 save메소드 호출 시, isNew 혹은 @Id 와 @Version 필드를 통해 엔티티를 업데이트할지 결정합니다. 자세한 내용은 다음 을 참고해주세요. 

[Reactive stack] R2DBC with mysql 삽질기

최근 진행하고있는 프로젝트를 MVC에서 Web flux로 변경함에 따라 DB 통신기술또한 비동기로 변경해야 했습니다. 때문에, 더이상 Spring data jpa 를 사용할 수 없었고 Database와 비동기로 통신할 수 있는

dlwnsdud205.tistory.com

 
테스트 결과는 어떻게 될까요. 내부 로직이 실행되지 않고, 10초동안 PaymentState가 FAILED로 변경되었는지 확인하다가 끝날까요?
(Spring data R2DBC 에서 선언적 트랜잭션 관리방법에 대한 이해가 없으면 예상하기 쉽지 않을 수 있습니다.)
 

IllegalStateException

 
단순히 실행되지 않을것 이라고 예상한 결과와는 다르게, Transactional을 수행할 수 없다는 에러를 뱉으며 바로 애플리케이션이 종료되는것을 볼 수 있습니다.
Spring data R2DBC의 @Transactional은 커넥션을 쓰레드 로컬이 아닌, reactor context로 관리합니다. 또한 이 context는 메소드 에서 반환되는 Publisher 마지막에 설정됩니다 (.contextWrite 와 같은 방식으로요) 
 
아래 @Transactional 구현 코드를 보시면, 927번 라인에서 invocation.proceedWithInvocation으로 메소드를 호출하고, 마지막 937, 938번 라인에서 .contextWrite로 context에 커넥션을 등록해주는것을 볼 수 있습니다.

TransactionAspectSupport.ReactiveTransactionSupport

 
즉, R2DBC에서 선언적 트랜잭션을 수행하기 위해서는 반드시 Publisher(Mono, Flux) 를 반환해야 합니다.
커넥션 관리에대한 더 자세한 내용이 궁금하신 분은 다음 을 참고해주세요.

[Reactive stack] Spring data R2DBC 커넥션 유지 방법

이전 글 https://dlwnsdud205.tistory.com/359 을 읽고오시면 더 이해하기 쉽습니다. [Reactive stack] R2DBC with mysql 삽질기 최근 진행하고있는 프로젝트를 MVC에서 Web flux로 변경함에 따라 DB 통신기술또한 비동

dlwnsdud205.tistory.com

 


Case 2. Publisher를 반환하지 않고 내부적으로 subscribe()를 할때

두번째 케이스 또한 마찬가지로 Publisher를 반환하지는 않습니다. 하지만, 내부적으로 .subscribe를 호출합니다.
Publisher를 반환하지 않기 때문에, 선언적 트랜잭션을 적용할 수 없습니다. 따라서, @Transactional 어노테이션을 분리해주었고 두번째 케이스의 코드는 아래와 같습니다.
 

PayService

 
54번 라인의 subscribeOn이 추가된 이유는, webflux에서 제공해주는 스레드에서는 사용자가 subscribe, block을 호출할 수 없기 때문입니다. (Spring에서 막아놨어요.) 그래서 54번과 같이 코드를 추가해서 다른 스레드에서 subscribe를 시켜줘야합니다.

@Transactional 이 없기 때문에, 커넥션을 못받고 실패할 것 이라고 생각할 수 있지만 SimpleR2dbcRepository 클래스에 Transaction이 있기때문에 커넥션을 얻어올 수 있습니다. 그럼에도 service에 @Transactional을 붙이는 이유는, 하나의 서비스 메소드에 대해서 ACID를 보장하기 위해서죠.

 
결과는 어떻게 될까요?
 

테스트 결과

 
이번에는 성공하는것을 볼 수 있습니다.
저희가 원하는 결과를 얻을 수 있었지만 만족할만한 해결법은 아닌거 같습니다.
지금 시나리오에서는 "Payment를 fail로 업데이트한다." 와 같이 Transaction 없어도 원자성이 보장되는 상황이지만, 메소드가 커질경우 원자성 보장을 위해 트랜잭션이 필요한 상황이 올 수 있습니다.
 


Case 3. Publisher를 반환할 때

세번째 케이스는 Publisher를 반환할 때 입니다.
이번에는 Publisher를 반환하므로 선언적 트랜잭션도 붙일 수 있을뿐 아니라, subscribeOn 과 같이 스레드를 별도로 관리할 필요도 없습니다.
코드는 아래와 같습니다.

PayService

 
54번 라인의 map은 Unit을 반환하기 위해 있는것이라 신경쓰지 않으셔도 됩니다.
 
이번 케이스의 테스트 결과는 아래와 같습니다.
 

테스트 결과

 
Webflux에서 컨트롤러의 응답을 DispatcherHandler가 자동으로 subscribe() 해주는것과 마찬가지로, EventListener의 구현또한, 자동으로 subscribe를 호출해주기 때문에 저희가 별도로 subscribe를 해주지 않아도 됩니다.
 


이번 포스팅 에서는 Webflux + R2DBC 환경에서 Spring Event를 사용하는 방법을 알아봤습니다.
이 포스팅에서 사용된 모든 코드는 아래 프로젝트에서 확인할 수 있습니다.
https://github.com/rooftop-MSA/pay-server 

GitHub - rooftop-MSA/pay-server: 👾 Payment server

👾 Payment server. Contribute to rooftop-MSA/pay-server development by creating an account on GitHub.

github.com

 
긴 글 읽어주셔서 감사합니다 :)