본문 바로가기

Reactive stack

[Reactive stack] Reactor pool로 Lettuce Connection 조절하기

프로젝트에서 pool(커넥션 풀 같은..)을 직접 구현해줘야할 일이 생겼습니다.

 

GitHub - rooftop-MSA/Netx: Distributed transaction and Event streaming framework based on saga pattern / Supports redis-stream /

Distributed transaction and Event streaming framework based on saga pattern / Supports redis-stream / Fully async - rooftop-MSA/Netx

github.com

하지만, 프로젝트가 Reactor기반으로 되어 있었기 때문에.. pooling되고있는 리소스를 획득하기 위해서 스레드가 블로킹 되면 안되는 상황이였어요. 즉, apache-common-pool2 과 같은 (예시가 많은) 라이브러리를 사용할 수 없었고, Reactive하게 동작하는 pool이 필요했습니다.

 

R2DBC에서는 이미 pool을 이용해서 DB connection을 관리하고 있으므로, reactor pool 라이브러리가 존재할것이라는 희망을 가진채 찾아보기 시작했고 아래와 같은 라이브러리를 발견할 수 있었습니다.

https://github.com/reactor/reactor-pool

 

 

GitHub - reactor/reactor-pool

Contribute to reactor/reactor-pool development by creating an account on GitHub.

github.com


Lettuce의 blocking 명령은 요청마다 새로운 커넥션을 맺는다.

(이 부분은 redis와 Lettuce에 대한 내용이므로, 관심이 없으신분은 넘기셔도 됩니다.)

 

Lettuce를 사용할때, redis의 blpop, xreadgroup과 같은 응답이 오기까지 대기하는 명령을 사용한다면 lettuce는 매번 새로운 connection을 맺습니다. 

실제로 아래 사진을 보시면, `Connection providers may create a new connection on each invocation or reutn pooled instances` 를 확인하실 수 있습니다.

LettuceConnectionProvider

 

또한, LettuceReactiveListCommands 클래스의 bPop 명령은 connection의 executeDedicated를 호출하고 있는데, 이는 내부적으로 LettuceConnectionProvider의 getConnection()을 호출합니다.

bPop명령은 redis의 blpop 혹은 brpop 명령에 해당합니다. 이 명령은 key 에 해당하는 list에 새로운 원소가 들어올때까지 대기하는 명령입니다.

 

LettuceReactiveListCommands

 

실제로, 매번 커넥션을 맺는지 확인하기 위해서 테스트를 진행해봤습니다. 

아래 사진은 테스트 중 redis에 명령을 날리는 클라이언트의 정보를 출력한것 인데요.

사진을 보시면 blpop 명령을 호출하는 클라이언트 커넥션이 1093개나 되는걸 보실 수 있습니다.

 

redis에 접속한 클라이언트의 수와 각 클라이언트가 실행중인 명령의 수


Blocking 명령을 Connection pool로 조절하기

reactor pool을 사용하기 위해서 아래 의존성을 받아주겠습니다.

implementation "io.projectreactor.addons:reactor-pool:0.1.5"

 

 

reactor-pool 의존성을 받은 후 새로운 pool을 얻기 위해서 PoolBuilder를 사용할 수 있는데요, 예시 코드는 아래와 같습니다.

저는 ReactiveRedisOperation 구현채를 pool안에 넣어주었습니다. (어쨋든 호출은 막히니 상관없음)

ReactiveRedisTemplate pool

 

PoolBuilder의 from 에는 Pool에서 새로운 리소스를 생성할때 사용할 Publisher를 넣어주면 됩니다.

sizeBetween 메소드를 통해서 min pool size, max pool size 를 설정 해줄 수 있으며, maxPendingAcquire.. 을 통해 최대 대기 수 를 조절할수도 있습니다. 

 

reactor pool을 사용하는 코드는 아래와 같습니다.

pool.withPoolable을 이용해서 pool의 리소스를 사용할 수 있으며, withPoolable 블록을 빠져나오면 리소스는 다시 pool에 반환됩니다.

 

pool이 예상대로 동작하는지 확인하기 위해서, pool의 사이즈를 400으로 조절하고 테스트를 진행해보았습니다.

아래 사진은 테스트 결과인데요, 이번에는 Too many open socket... 에러도 안뜨고, blpop 명령을 수행하는 클라이언트의 수도 400 이상으로 올라가지 않는것을 볼 수 있습니다.

 

 

모든 코드는 아래 프로젝트에서 확인하실 수 있습니다.

https://github.com/rooftop-MSA/Netx

 

GitHub - rooftop-MSA/Netx: Distributed transaction and Event streaming framework based on saga pattern / Supports redis-stream /

Distributed transaction and Event streaming framework based on saga pattern / Supports redis-stream / Fully async - rooftop-MSA/Netx

github.com