본문 바로가기

Reactive stack

[Reactive stack] Spring data R2DBC 커넥션 유지 방법

재 가공한 글을 medium에서도 읽을 수 있습니다.

 

이전 글 https://dlwnsdud205.tistory.com/359 을 읽고오시면 더 이해하기 쉽습니다.

 

[Reactive stack] R2DBC with mysql 삽질기

최근 진행하고있는 프로젝트를 MVC에서 Web flux로 변경함에 따라 DB 통신기술또한 비동기로 변경해야 했습니다. 때문에, 더이상 Spring data jpa 를 사용할 수 없었고 Database와 비동기로 통신할 수 있는

dlwnsdud205.tistory.com

 

webflux와 spring data r2dbc 를 사용하면서 다음과 같은 의문이 들었습니다. 

스프링 문서에는 R2DBC에서도 선언적 트랜잭션을 지원한다고 나와있는데, webflux에서 이게 어떻게 가능한거지?

 

mvc + spring data jpa 환경에서 선언적 트랜잭션을 사용하면, 다음과 같은 방식으로 트랜잭션을 홀딩합니다.

  1. PlatformTransactionManager가 트랜잭션을 시작합니다. 이때, Jpa 를 사용한다면 JpaTransactionManager 가 PlatformTransactionManager의 구현채로 등록될것입니다.
  2. TransactionSynchronizationManager 가 ThreadLocal방식으로 Connection을 유지하고 이후에 같은 스레드에서 요청되는 로직은 ThreadLocal에 홀딩된 커넥션을 가져와서 사용하게 됩니다.

하지만, webflux 환경에서는 위와같이 트랜잭션을 홀딩할 수 없는데요. 이유는 일련의 리액티브 시퀀스가 모두 같은 스레드에서 동작하는걸 webflux에서는 보장하지 못하기 때문입니다. 

곰곰히 생각해봐도 Netty의 Eventloop가 당장 사용가능한 스레드를 작업 처리에 할당할것이기 때문에 스레드는 매번 바뀔 수 밖에 없습니다.

 

그렇다면, ThreadLocal을 사용하지 못하는 상황에서 r2dbc는 어떻게 커넥션을 유지할까요?

 

커넥션 유지 방법

한번 코드를 따라가 보겠습니다.

 

R2dbcTransactionManagerAutoConfiguration

 

우선, Spring boot를 사용할경우, R2dbcTransactionManagerAutoConfiguration에 의해서 자동으로 R2dbcTransactionManager가 Bean으로 등록됩니다. 

ConnectionFactory에는 설정파일에 작성한 url, username, password, connectionpool 등이 들어가겠죠?

 

R2dbcTransactionManager는 아래와 같은 hierarchy를 갖고있습니다.

public interface TransactionManager { }

public interface ReactiveTransactionManager implements TransactionManager ... {}

public class AbstractReactiveTransactionManager implements ReactiveTransactionManager ... {}

public class R2dbcTransactionManager implements AbstractReactiveTransactionManager ... {}

 

이때 TransactionManager와 ReactiveTransactionManager는 명세에 불과하기 때문에, 저희가 원하는 "어떻게 커넥션을 유지할까?" 에 대한 정보를 얻기 위해선 AbstractReactiveTransactionManager와 R2dbcTransactionManager를 봐야합니다.

 

트랜잭션 시작시 호출되는 AbstractReactiveTransactionManager 클래스의 getReactiveTransaction 메소드를 우선 살펴보겠습니다.

 

AbstractReactiveTransactionManager

 

102번 라인 : Transaction propagation 정보를 가져옵니다.

104번 라인 : TransactionSynchronizationManager 에게서 현재 트랜잭션을 가져옵니다.

 

이제 다시 TransactionSynchronizationManager로 들어가 보겠습니다.

 

TransactionSynchronizationManager

 

TransactionSynchronizationManager.forCurrentTransaction 메소드는 TransactionContextManager.currentContext를 호출하고, 새로운 TransactionSynchronizationManager를 만들어서 반환합니다.

 

TransactionContextManager

 

TransactionContextManager.currentContext 메소드를 보니, Mono.deferContextual 메소드를 통해 cold start 방식으로 현재 context를 가져오고 TransactionContext를 함께 생성해주는걸 알 수 있습니다.

이때, 54번에 의해서 TransactionContext가 Reactor context에 포함되어있는지 확인하고, 없다면 TransactionContextHolder가 있는지 확인합니다. 만약, 둘다 없다면 NoTransactionInContextExcepion을 던집니다.

 

여기 까지 봤을때, spring data r2dbc는 여러 스레드에 걸쳐 커넥션을 유지하기위해 project reactor의 context를 사용한다고 유추할 수 있습니다.

 

하지만, 지금까지는 재사용 하거나 이미 있는 트랜잭션을 받아오는 과정이었고 실제로 새로운 트랜잭션이 생성되는쪽은 어떻게 구현되어 있을까요?

 

해답은 Aspect 진입시점인 TransactionAspectSupport.ReactiveTransactionSupport class 에서 찾을 수 있었습니다.

 

TransactionAspectSupport.ReactiveTransactionSupport

 

위 코드는 Mono를 반환하는 로직입니다. 실제 코드를 열어보시면 바로 아랫블록에 Flux에 대한 구현도 있습니다 :) 

궁금하신분은 열어보셔도 좋을거 같아요.

 

다시 코드로 돌아와서, 코드를 천천히 살펴보면, 924번 줄에서 Transaction이 없다면 생성하고, 937, 938번줄에 .contextWrite를 통해서 reactorContext에 커넥션을 등록하는걸 확인하실 수 있습니다.

 

정리

위 문맥을 통해서, spring data r2dbc에서 어떤방식으로 커넥션을 관리하는지 개략적으로나마 이해할 수 있었습니다.

 

스프링 블로그 글중 이 포스팅과 관련된 전문을 캡처하는것으로 글을 마치겠습니다.