코드 리뷰
이전에는 굳이 Redis를 사용할 필요가 없다고 생각해서 MySQL을 사용하여 기능을 구현했다. 해당 기능은 이체 트랜잭션을 분리하여 발생한 데이터 정합성 문제를 해결하기 위해 이체 발생 로그를 저장하는 기능이다.
하지만 코드 리뷰에서 데이터가 오래 유지되지 않고 금방 사라져도 되는 데이터라는 점에서 Redis와 같은 메모리 기반의 DB 사용하면 좋겠다는 리뷰를 받았다.
이전에도 Redis를 사용하면 된다고 생각했지만 delete 연산 때문에 사용하지 않은 것인데 뭔가 다른 방법이 필요했다. 현재 producer, consumer 구조를 사용하고 있기에 Redis를 해당 구조에서 사용할 방법을 찾아보았다.
Redis Stream
찾아보니 Redis가 지원하는 많은 데이터 형식 중 Stream이라는 것이 있었다. 간단하게 살펴보니 큐에 하나씩 넣고 빼는 작업을 Redis가 담당하는 것으로 보였다. 그러니까 중간에서 어떤 데이터를 넘겨주는 역할을 하는 것이었다.
우선 중요한 것은 시간 복잡도이니 데이터를 추가하고 제거하는 비용을 알아보았다. 추가하는 것은 O(1)이고, 단일 항목에 접근하는 것은 O(n)이었다. 이때 n은 ID의 길이라는 설명이 있었다. 그럼 무한대가 아니라는 소리인데 조금 더 자세히 보니 Stream ID는 일반적으로 짧고 고정된 길이라서 일정한 시간 안으로 조회할 수 있다는 것이었다. 어떻게 이게 가능하지?라는 생각에 찾아보니 내부적으로 Radix Tree로 구성되기 때문이라고 한다.
Radix Tree는 Binary Search Tree나 Red Black Tree처럼 왼쪽 노드는 키값이 작고 오른쪽 노드는 키값이 큰 순으로 노드가 배치되는 원리를 사용한다. 다만 트리의 균형을 맞추는 방식에서 차이가 있는데, Red Black Tree는 회전하는 방식을 사용하지만 Radix Tree는 키값의 이진 값에 따라서 0이면 왼쪽, 1이면 오른쪽으로 노드를 배치하여 균형을 맞춘다. 어차피 컴퓨터는 데이터를 이진수로 표현하니까 키값을 이렇게 이진수 값에 따라 배치하면 효율적이라는 것이다.
100% 이해하지는 못했지만 Trie라는 알고리즘과 유사해 보였다. 어쨌든 정해진 크기의 ID가 있고 위와 같은 Radix Tree를 통해 효율적으로 관리하기 때문에 n이 무한대가 아니라는 것이다. 결국 항상 일정 시간 안에는 모든 값에 접근이 되는 것이다.
사용할 명령어들은 아래와 같다.
- XADD
- 데이터를 추가하는 명령어이다.
- 새로운 entry를 추가할 때 O(1)이 걸린다.
- XADD [Stream Key] [Stream ID] [key 1] [value 1] [key 2] [value 2] … 와 같이 사용한다.
- XRANGE
- 특정 범위를 읽어오는 명령어이다.
- O(n)이 걸린다. n이 상수이면(항상 처음 10개의 값을 원하는 경우) O(1)로 봐도 무방하다.
- 범위 구간 외에 특정 ID에 해당하는 값도 읽어올 수 있다. 이는 XREAD라는 데이터를 읽어오는 명령어는 내가 지정한 ID보다 큰 값을 읽어오기 떄문이다. XRANGE에서 두 ID 구간을 동일하게 설정하면 해당 ID에 해당하는 값 하나만 읽을 수 있다.
- XRANGE [Stream Key] [Stream ID 1] [Stream ID 2]
- 이때 ID 자리에 각각 - , + 를 넣으면 처음부터 끝까지 읽어온다.
- XCLAIM
- consumer group에 있는 메세지의 consumer name을 변경하는 명령어이다.
- 이 명령어가 필요한 이유는 처리 되지 않은 메세지의 consumer name을 변경하여 다른 consumer가 처리할 수 있도록 하는 것이다.
- 나의 경우 이렇게 처리되지 않은 메세지를 별도로 관리하기 위해 사용했다. 처리되지 않은 것은 뭔가 문제가 발생한 것으로 보고 롤백을 했다.
- O(log n)의 시간 복잡도를 가지며 이때 n은 처리 되지 않은 메세지의 수이다. 전체 데이터의 수가 아니다.
- 즉 서버가 감당하지 못할 요청이 발생하거나 서버에 문제가 발생하는 것이 아니라면 n의 수는 크지 않을 것이다.
- XCLAIM [Stream Key] [Consumer Group] [Consumer Name] [pending 된 시간] [ID]
- pending 된 시간은 메세지를 누군가 읽은 후 지난 시간을 넣는다. 즉, 처리하려고 읽었지만 그 이후에 또 읽기까지의 시간을 의미하며 해당 시간이 지났으면 XCLAIM 명령을 실행한다.
- ID는 해당 메세지의 ID를 입력하면 된다.
- XGROUP CREATE
- Group을 만드는 명령어이다. Stream 데이터는 이 그룹 기준으로 메세지를 관리하고 특정 그룹에 대해 그 메세지를 처리한다.
- O(1)의 시간 복잡도를 가진다.
- XGROUP CREATE [Stream Key] [Stream Group] <id | $> MKSTREAM
- XGROUP CREATE 명령은 Stream Key에 해당하는 스트림이 존재하는 것을 가정한다. 만약 존재하지 않으면 오류가 발생한다.
- 하지만 스트림이 존재하지 않는 경우 MKSTREAM 명령어를 사용하여 생성할 수 있는데, 이때 <id | $> 에 0을 입력하면 스트림을 0의 길이로 자동으로 생성할 수 있다. 그럼 Stream Key, Stream Group에 입력한 값으로 스트림과 스트림 그룹을 생성할 수 있다.
- 처음 그룹을 생성할 때 사용하면 된다.
- XREADGROUP
- consumer group에 있는 데이터를 읽는 명령어이다.
- O(n)의 시간 복잡도를 가지지만, 앞에서 순서대로 일정 개수만 가져온다면 O(1)의 시간복잡도로 봐도 무방하다.
- XREADGROUP GROUP [Stream Group] [Stream Consumer] STREAMS [Stream Key] [ID]
- 마지막에 ID의 이상의 값을 읽어온다.
- ID에 특수한 값으로 “>”를 입력하면 아직 consumer group에 할당되지 않은 Stream Key에 해당하는 데이터를 읽어올 수 있다.
- 이렇게 처음으로 읽은 데이터는 ack처리 전까지 pending된 상태에 놓이게 된다.
- 이때 명령어에 입력한 consumer group으로 할당된다.
- 이렇게 읽어지기 전에 ID에 ID 값을 넣어서 읽으려고 시도하면 읽어지지 않는다. 이는 consumer group이 할당되지 않았기 때문이다.
- [Stream Consumer] 다음에 count 2 를 입력하면 2개만 읽어오는 것도 가능하다.
여기까지가 대략 내가 사용한 명령어이다. 이정도만 사용하니 내가 구상한 방법을 구현할 수 있었다.
구상한 방법
우선 MySQL을 사용하던 이체 작업을 Redis로 변경해야 했다. 이때 Redis도 비동기로 처리를 지원하는 클래스들이 있어서 이를 이용했다. 비동기에 대해 완전히 이해하지는 못했지만 Future라는 것을 활용하는 것으로 보였다. 아무튼 MySQL에 기록하고 ThreadPoolExecutor에 넘겨줬던 입금 Task 대신 Redis에 메세지를 날리는 방식으로 바꾸었다.
이후 Redis에 저장된 메세지를 백그라운드에서 지속적으로 읽고, 이 정보를 바탕으로 기존에 사용하던 ThreadPoolExecutor에 입금 Task를 추가해 주는 방식을 구상했다.
사실 생각은 쉽지만 항상 구현이 어려운 것 같다. 무엇보다 이렇게 사용한 사람들이 없어서 꽤 오랜 시간이 걸렸다. 하지만 Redis, Lettuce, Spring 공식 문서와 운이 좋게도 나와 비슷한 방법을 고민하고 구상한 1~2개의 블로그를 참고하여 어떻게 구현은 했다.
구현 코드
RedisOperator
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;
import io.lettuce.core.protocol.CommandType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisOperator {
private final RedisTemplate<String, Object> redisTemplate;
public void ackStream(String streamKey, String consumerGroup, String id) {
redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, id);
}
/**
* 이체한 정보를 기록하기 위해 redis stream에 메시지를 추가하는 메소드
*
* @param streamKey redis stream 이름(send-stream)
* @param sendPk 이체하는 사람의 계좌 pk
* @param depositPk 입금받는 사람의 계좌 pk
* @param money 이체할 금액
*/
public void addStream(String streamKey, long sendPk, long depositPk, long money) {
RedisAsyncCommands commands = (RedisAsyncCommands)redisTemplate.getConnectionFactory()
.getConnection()
.getNativeConnection();
CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8).addKey(streamKey)
.add("*") // id 자동 생성
.add("send-pk").add(sendPk)
.add("deposit-pk").add(depositPk)
.add("money").add(money);
commands.dispatch(CommandType.XADD, new StatusOutput(StringCodec.UTF8), commandArgs);
}
public PendingMessages findPendingMessages(String streamKey, String consumerGroupName, String consumerName) {
return redisTemplate.opsForStream()
.pending(streamKey, Consumer.from(consumerGroupName, consumerName), Range.unbounded(), 100L);
}
/**
*
* id로 메세지 정보를 얻는 메소드
* 0번 인덱스에 send-pk, 1번 인덱스에 deposit-pk, 2번 인덱스에 money를 넣은 Long 타입 배열을 리턴한다.
*/
public Long[] findMessageById(String streamKey, String id) {
RedisStreamCommands command = redisTemplate.getConnectionFactory()
.getConnection();
// deserialize가 안돼서 직접 타입을 맞춰줌
// command.range()를 하면 왜인지 모르겠지만 Map형태로 데이터가 나오지 않아서 생기는 문제 같음.
try {
List<ByteRecord> byteRecords = command.xRange(streamKey.getBytes("UTF-8"), Range.closed(id, id));
ByteRecord entries = byteRecords.get(0);
Long[] values = new Long[3];
int index = 0;
for (Map.Entry<byte[], byte[]> entry : entries) {
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
values[index++] = Long.valueOf(value);
}
return values;
} catch (Exception e) {
// 뭔가 로그 파일로 남기거나 메일로 에러가 발생했다고 알려줘야 할 것 같음
log.error("[{}] streamKey: {} | id: {} | message: {}", e.getClass().getSimpleName(), streamKey, id,
e.getMessage());
}
return null;
}
/**
*
* pending된 메세지의 consumer name을 바꾸는 메소드
* 기존 consumer에서 pending된 메세지를 처리하지 않도록 한다.
*/
public void claimMessage(PendingMessage pendingMessage, String streamKey, String consumerName) {
RedisAsyncCommands commands = (RedisAsyncCommands)redisTemplate.getConnectionFactory()
.getConnection()
.getNativeConnection();
CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8).addKey(streamKey)
.add(pendingMessage.getGroupName())
.add(consumerName)
.add("3000") // 3초 이상 pending된 메세지만 처리한다
.add(pendingMessage.getIdAsString());
commands.dispatch(CommandType.XCLAIM, new StatusOutput(StringCodec.UTF8), commandArgs);
}
public void createStreamConsumerGroup(String streamKey, String consumerGroupName) {
// Stream이 존재하지 않는 경우 생성
if (!redisTemplate.hasKey(streamKey)) {
RedisAsyncCommands commands = (RedisAsyncCommands)redisTemplate.getConnectionFactory()
.getConnection()
.getNativeConnection();
// 사용할 명령어 생성
CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.CREATE)
.add(streamKey) // key
.add(consumerGroupName) // group
.add("0") // <id | $> 0을 사용하면 처음부터 전체 스트림을 가져오도록 한다.
.add("MKSTREAM"); // 스트림이 존재하지 않는 경우 스트림을 0의 길이로 자동 생성
commands.dispatch(CommandType.XGROUP, new StatusOutput(StringCodec.UTF8), commandArgs);
} else {
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}
/**
*
* 해당 스트림의 consumer group이 있는지 확인하는 메소드
*/
public boolean isStreamConsumerGroupExist(String streamKey, String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = redisTemplate.opsForStream().groups(streamKey).stream().iterator();
while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if (xInfoGroup.groupName().equals(consumerGroupName)) {
return true;
}
}
return false;
}
public StreamMessageListenerContainer createStreamMessageListenerContainer() {
return StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(),
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.hashKeySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.pollTimeout(Duration.ofMillis(20))
.build());
}
}
코드가 꽤 복잡한데 redisTemplate.opsForStream()을 사용한 것이 보이고, 커넥션 자체를 얻어서 dispatch() 메소드에 직접 작성한 명령어를 넣어서 실행하는 방법도 보인다.
정확한 이유는 잘 모르겠지만 redisTemplateopsForStream()으로 실행하니 직렬화와 역직렬화 문제, 원하는대로 명령어가 나가지 않은? 문제도 발생했다. 사실 이거 자체를 고치지 못하여 CommandArgs를 통해 명령어를 직접 만들어서 사용하기도 했다.
- addStream
- XADD 명령어를 통해 스트림에 데이터를 추가하는 메소드
- CommandArgs에 Stream Key를 설정하고 차례대로 ID 자동 생성을 위해 [ * ]을 넣고 나머진 key, value로 나열한 것을 볼 수 있다.
- 그럼 XADD [Stream Key] * key 1, value 1… 과 같은 명령어가 되는 것이다.
- 이때 RedisAsyncCommands를 사용하는데 커넥션을 얻은 것을 비동기로 처리할 수 있어서 사용했다. 굳이 메세지 추가하는데 이 응답을 기다릴 필요가 없다고 생각했기 때문이다.
- 하지만 이는 나중에 문제가 생길 것으로 판단하여 수정하게 된다.
- findMessageById
- 스트림 데이터의 ID로 데이터를 읽어오는 메소드로 XRANGE를 사용한다.
- 이때 직렬화, 역직렬화 문제가 발생하여 전부 수제로 타입을 변환했다.
- 이상하게 range() 메서드가 마음대로 나오지 않았다. 그래서 커넥션을 통해 xrange() 메소드를 사용했다.
public void createStreamConsumerGroup(String streamKey, String consumerGroupName) {
// Stream이 존재하지 않는 경우 생성
if (!redisTemplate.hasKey(streamKey)) {
RedisAsyncCommands commands = (RedisAsyncCommands)redisTemplate.getConnectionFactory()
.getConnection()
.getNativeConnection();
// 사용할 명령어 생성
CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.CREATE)
.add(streamKey) // key
.add(consumerGroupName) // group
.add("0") // <id | $> 0을 사용하면 처음부터 전체 스트림을 가져오도록 한다.
.add("MKSTREAM"); // 스트림이 존재하지 않는 경우 스트림을 0의 길이로 자동 생성
commands.dispatch(CommandType.XGROUP, new StatusOutput(StringCodec.UTF8), commandArgs);
} else {
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}
/**
*
* 해당 스트림의 consumer group이 있는지 확인하는 메소드
*/
public boolean isStreamConsumerGroupExist(String streamKey, String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = redisTemplate.opsForStream().groups(streamKey).stream().iterator();
while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if (xInfoGroup.groupName().equals(consumerGroupName)) {
return true;
}
}
return false;
}
public StreamMessageListenerContainer createStreamMessageListenerContainer() {
return StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(),
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.hashKeySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.pollTimeout(Duration.ofMillis(20))
.build());
}
위의 메소드들은 스트림과 스트림 그룹을 생성하는 메소드로 서버 실행 시 존재 유무를 확인하여 생성한다.
streamMessageListenerContainer는 Redis에 메세지가 있는지 확인하고 이를 읽어오기 위한 메소드이다. redisTemplate의 커넥션을 사용하고 stream에 저장된 key, value를 직렬화하는 설정과 pollTimeout을 설정한다. 이때 설정한 시간 단위마다 polling을 하는 것이다. 이때 중요한 점은 XREADGROUP에서 말했던 “>”라는 특수한 ID 값을 사용하여 읽는다는 점이다. 아래에 보면 해당 컨테이너를 사용한다.
RedisStreamConsumer
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import org.c4marathon.assignment.bankaccount.message.util.RedisOperator;
import org.c4marathon.assignment.bankaccount.service.DepositHandlerService;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.stereotype.Component;
import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
public class RedisStreamConsumer implements StreamListener<String, MapRecord<String, Object, Object>>, InitializingBean,
DisposableBean {
private StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> listenerContainer;
private Subscription subscription;
@Value("${redis-stream.stream-key}")
private String streamKey;
@Value("${redis-stream.consumer-group-name}")
private String consumerGroupName;
@Value("${redis-stream.consumer-name}")
private String consumerName;
private final RedisOperator redisOperator;
private final DepositHandlerService depositHandlerService;
@Override
public void destroy() throws Exception {
if (subscription != null) {
subscription.cancel();
}
if (listenerContainer != null) {
listenerContainer.stop();
}
}
@Override
public void afterPropertiesSet() throws Exception {
// Consumer Group 초기화
redisOperator.createStreamConsumerGroup(streamKey, consumerGroupName);
// StreamMessageListenerContainer 설정
listenerContainer = redisOperator.createStreamMessageListenerContainer();
subscription = listenerContainer.receive(
Consumer.from(consumerGroupName, consumerName),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
this
);
subscription.await(Duration.ofSeconds(2));
listenerContainer.start();
}
/**
*
* Redis Stream 메세지를 처리하는 메소드
*/
@Override
public void onMessage(MapRecord<String, Object, Object> message) {
Set<Map.Entry<Object, Object>> entries = message.getValue().entrySet();
Long[] depositData = new Long[3];
int index = 0;
// depositData에 send-pk, deposit-pk, money를 차례대로 담는다.
for (Map.Entry<Object, Object> entry : entries) {
String value = (String)entry.getValue();
depositData[index++] = Long.valueOf(value);
}
// deposit-pk에 money를 추가하는 task 추가
depositHandlerService.doDeposit(depositData[1], depositData[2], message.getId().toString());
}
}
afterPropertiesSet()에서 모든 설정을 했는데, 내가 사용할 Stream Key, Consumer Group을 생성한다. 이후 createStreamMessageListenerContainer를 통해 StreamMessageListenerContainer를 생성하고 이에 대한 설정으로 consumer group, consumer name을 설정한다. 이후 이를 실행하게 되면 내가 설정한 consumer group과 consumer name으로 XRREADGROUP 명령어를 “>” 특수 ID로 실행하게 된다.
StreamListener<String, MapRecord<String, Object, Object>>를 implements 하면 onMessage() 메소드를 구현해야 한다. message는 MapRecord<String,Object,Object> 형태이며 이 메소드가 실행됐다는 것은 해당 Redis Stream 메세지가 내가 설정한 consumer group에 pending된 상태로 존재한다는 것이다.
MainAccountService
@Transactional(isolation = Isolation.READ_COMMITTED)
public void sendToOtherAccount(long senderPk, long depositPk, long money) {
// 1. 나의 계좌에서 이체할 금액을 빼준다.
MainAccount myAccount = mainAccountRepository.findByPkForUpdate(senderPk)
.orElseThrow(AccountErrorCode.ACCOUNT_NOT_FOUND::accountException);
autoMoneyChange(myAccount, money);
mainAccountRepository.save(myAccount);
// 2. 입금 로직을 위한 이체 메세지를 Redis에 넘겨주고 트랜잭션을 종료한다.
redisOperator.addStream(streamKey, senderPk, depositPk, money);
}
기존 MySQL 작업을 모두 제거하고 addStream()만 호출하고 즉시 리턴한다.
DepositHandlerService
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
@Async("depositExecutor")
public void doDeposit(long accountPk, long money, String streamId) {
mainAccountRepository.deposit(accountPk, money);
redisOperator.ackStream(streamKey, consumerGroup, streamId);
}
입금을 처리하는 메소드 또한 DB를 확인할 필요 없이 바로 update 쿼리를 보내고 해당 메세지를 처리했다고 ack를 한다.
위 방법의 문제점
문제점을 보기 전에 위의 코드에선 없지만 Pending 메세지를 처리하는 스케줄러 기능을 만들었다.
@Scheduled(fixedRate = 5000)
public void consumePendingMessage() {
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroupName, consumerName);
for (PendingMessage pendingMessage : pendingMessages) {
// 기존 consumer가 처리하지 못하도록 consumer name을 claim-consumer로 변경한다.
redisOperator.claimMessage(pendingMessage, streamKey, claimConsumerName);
// streamKey와 id에 해당하는 sned-pk, deposit-pk, money를 차례대로 담은 Long 배열을 조회한다.
Long[] data = redisOperator.findMessageById(streamKey,
pendingMessage.getIdAsString());
// 처리되지 않은 이체 로그는 롤백을 시켜준다.
if (data != null) {
// 롤백을 하므로 send-pk에 money를 추가해준다.
depositHandlerService.doDeposit(data[0], data[2], pendingMessage.getIdAsString());
}
}
}
pending 된 메세지를 무작정 읽어서 doDeposit에 작업을 넣는 간단한 스케줄러이다.
사실 여기까지 구상한 대로 잘 구현했다고 생각했다. 하지만 비동기로 뭔가 처리하는 것 자체가 처음이라 문제가 없는지 더 고민해 보았고, 비동기라서 발생하는 문제들이 보였다. 우선 DeposithandlerService의 mainAccountRepository.deposit() 메소드는 MySQL에 update 쿼리를 날리는데, 이게 성공하고 redisOperator.ackStream()이 실패할 수 있다. 이는 ackStream()이 비동기로 동작하고, 성공 여부를 보지 않고 doDeposit() 메소드가 종료되기 때문이다.
그렇다면 ack가 정상 처리되지 않은 경우 입금은 정상 처리 되었지만, ack가 되지 않아 메세지가 그대로 남을 수 있다. 이후에 이 메세지가 또다시 처리된다면 중복하여 입금되는 문제가 발생할 수 있다. 즉, 나는 1000원을 이체했는데 상대방은 2000원 이상이 입금될 수 있다는 말이다. 즉 ack를 확인하고 쿼리를 처리해야 정합성에 문제가 없을 것이다.
이외에도 XADD에서 문제가 생길 경우, 이체하는 사람의 돈은 차감됐을 것인데, 이를 롤백하는 과정이 필요할 것이다.
코드 수정
RedisOperator
public Long ackStream(String streamKey, String consumerGroup, String id) {
return redisTemplate.opsForStream().acknowledge(streamKey, consumerGroup, id);
}
/**
* 이체한 정보를 기록하기 위해 redis stream에 메시지를 추가하는 메소드
*
* @param streamKey redis stream 이름(send-stream)
* @param sendPk 이체하는 사람의 계좌 pk
* @param depositPk 입금받는 사람의 계좌 pk
* @param money 이체할 금액
*/
public void addStream(String streamKey, long sendPk, long depositPk, long money) {
RedisConnection connection = getRedisConnection();
if (connection == null) {
return;
}
RedisAsyncCommands commands = (RedisAsyncCommands)connection.getNativeConnection();
CommandArgs<String, String> commandArgs = new CommandArgs<>(StringCodec.UTF8).addKey(streamKey)
.add("*") // id 자동 생성
.add("send-pk").add(sendPk)
.add("deposit-pk").add(depositPk)
.add("money").add(money);
RedisFuture dispatch = commands.dispatch(CommandType.XADD, new StatusOutput<>(StringCodec.UTF8), commandArgs);
dispatch.handle((result, exception) -> {
// 예외가 발생하면 이체 롤백 요청을 보낸다.
if (exception != null) {
sendRollbackHandlerService.rollbackDeposit(sendPk, depositPk, money);
}
return result;
});
}
ackStream()의 경우 리턴 값을 추가해 주었다. ack가 성공하면 ack가 된 데이터 수를 리턴하게 되는데 ID를 기반으로 ack를 하기 때문에 1을 리턴한다. 실패하면 0을 리턴한다. 이를 통해 예외 처리를 할 것이다.
addStream()은 dispatch()의 결과로 RedisFuture를 받는 것을 볼 수 있는데, handle() 메소드로 이후에 요청에 대한 응답을 핸들링할 수 있다. result는 성공한 결과를, exception은 예외가 발생하면 예외에 대한 정보를 리턴한다. 그래서 예외가 발생한 경우 rollback을 수행하도록 했다.
진짜 비동기로 동작하는지 궁금하면 이후에 println()을 통해 아무거나 출력해 보면 잘 동작하는 것을 알 수 있을 것이다.
package org.c4marathon.assignment.bankaccount.service;
import org.c4marathon.assignment.bankaccount.exception.async.AccountAsyncErrorCode;
import org.c4marathon.assignment.bankaccount.repository.MainAccountRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class SendRollbackHandlerService {
private final MainAccountRepository mainAccountRepository;
@Async("rollbackExecutor")
@Transactional
public void rollbackDeposit(long sendPk, long depositPk, long money) {
int updateResult = mainAccountRepository.deposit(sendPk, money);
// 상대 계좌에 업데이트가 되지 않은 경우 롤백 실패 예외가 발생한다.
if (updateResult == 0) {
throw AccountAsyncErrorCode.SEND_ROLLBACK_FAILED.accountAsyncException();
}
}
}
기존 입금을 처리하는 스레드 풀과 거의 동일하게 동작한다. 다만 예외 처리가 다른데, 롤백의 경우 단순히 예외를 발생시키고 다시 원복 하는 쿼리가 없다. 하지만 아래 코드를 보면
@Service
@RequiredArgsConstructor
public class DepositHandlerService {
private final MainAccountRepository mainAccountRepository;
@Async("depositExecutor")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void doDeposit(long sendPk, long depositPk, long money) {
int updateResult = mainAccountRepository.deposit(depositPk, money);
// 상대 계좌에 업데이트가 되지 않은 경우 롤백해야 한다.
if (updateResult == 0) {
mainAccountRepository.deposit(sendPk, money);
}
}
}
문제가 생기는 즉시 원래 상태로 되돌리는 것을 볼 수 있다. 이는 다음 코드를 보면 알게 될 텐데 ack가 성공하면 다음 작업을 진행하도록 했기 때문이다. updateResult가 0인 경우는 상대 계좌가 없는 경우밖에 없을 것이고 이체를 이미 진행 중인 계좌라면 그 계좌는 당연히 존재할 것이다.
RedisStreamConsumer
@Override
public void onMessage(MapRecord<String, Object, Object> message) {
// 테스트에서 ack 처리 되지 않도록 처리
if (isTest) {
return;
}
Set<Map.Entry<Object, Object>> entries = message.getValue().entrySet();
Long[] accountData = new Long[3];
int index = 0;
// depositData에 send-pk, deposit-pk, money를 차례대로 담는다.
for (Map.Entry<Object, Object> entry : entries) {
String value = (String)entry.getValue();
accountData[index++] = Long.valueOf(value);
}
// deposit-pk에 money를 추가하는 task 추가
Long ackResult = redisOperator.ackStream(streamKey, consumerGroupName, message.getId().toString());
if (ackResult != 0) {
depositHandlerService.doDeposit(accountData[0], accountData[1], accountData[2]);
}
}
불편한 코드가 있는데 isTest라는 필드이다. 사실 넣기 싫었지만, 테스트 코드를 작성하다 보니 불가피하게 환경 변수로 설정하여 사용하게 되었다. 저게 아니면 읽어오는 메세지에서 ack가 실패하는 경우를 만들 방법이 떠오르지 않았다.
ackStream이 성공하면 doDeposit()을 호출하도록 했다.
그럼 실패하면 어떻게 하냐?라고 생각할 수 있다. 하지만 이렇게 한 번 읽은 메세지는 pending 된다고 했다. 그러니 pending 메세지를 처리하는 스케줄러가 이를 알아서 처리해 줄 것이다.
PendingMessageScheduler
/**
*
* 어떠한 문제로 처리되지 않은 pending 메세지를 처리하는 메소드
* 기존 consumer가 중복하여 처리하지 않도록 consumer-name을 변경한다.
*/
@Scheduled(fixedRate = 5000)
public void consumePendingMessage() {
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroupName, consumerName);
for (PendingMessage pendingMessage : pendingMessages) {
// StreamListener가 처리중인 메세지일 수 있으므로 pendingTime보다 작은 시간이면 처리하지 않는다.
if (pendingMessage.getElapsedTimeSinceLastDelivery().toMillis() < pendingTime) {
break;
}
// 기존 consumer가 중복하여 처리하지 못하도록 consumer name을 claim-consumer로 변경한다.
redisOperator.claimMessage(pendingMessage, streamKey, claimConsumerName);
// streamKey와 id에 해당하는 sned-pk, deposit-pk, money를 차례대로 담은 Long 배열을 조회한다.
Long[] data = redisOperator.findMessageById(streamKey,
pendingMessage.getIdAsString());
Long ackResult = redisOperator.ackStream(streamKey, consumerGroupName, pendingMessage.getIdAsString());
// 처리되지 않은 이체 로그는 롤백을 시켜준다.
if (data != null && data[2] != 0 && ackResult == 1) {
// 롤백을 하므로 send-pk에 money를 추가해준다.
depositHandlerService.doDeposit(data[1], data[0], data[2]);
}
}
}
/**
*
* consumePendingMessage()에서 오류가 발생하여 처리하지 못한 메세지를 처리하는 메소드
* consumePendingMessage()와 유사하지만 바뀐 claim-consumer에 대해서만 처리한다.
*/
@Scheduled(fixedRate = 10000)
public void consumeClaimMessage() {
PendingMessages pendingMessages = redisOperator.findPendingMessages(streamKey, consumerGroupName,
claimConsumerName);
for (PendingMessage pendingMessage : pendingMessages) {
Long[] data = redisOperator.findMessageById(streamKey,
pendingMessage.getIdAsString());
Long ackResult = redisOperator.ackStream(streamKey, consumerGroupName, pendingMessage.getIdAsString());
if (data != null && data[2] != 0 && ackResult == 1) {
depositHandlerService.doDeposit(data[1], data[0], data[2]);
}
}
}
Ack 결괏값에 대한 예외 처리 외에도 getElapsedTimeSinceLastDelivery() 메소드를 통한 예외 처리가 보일 것이다.
이는 스케줄러 특성상 Consumer가 읽어서 pending처리되자마자 읽을 수 있다고 생각했다. 나의 경우 위 스케줄러에서 처리하는 메세지는 전부 롤백시켜서 사용자로 하여금 다시 이체를 하도록 만들었다. 그렇다면 Consumer가 정상적으로 읽었지만 스케줄러가 먼저 작업을 처리해 버리면 불필요하게 롤백을 하게 된다. 그런 경우를 방지하고자 일정 시간이 지나지 않은 경우는 처리하지 않도록 했다.
그리고 pending 메세지를 처리할 때 중복으로 처리하지 않도록 claimMessage() 메소드를 통해 consumer name을 변경해 주었다. 즉, 기존 consumer name으로 XREADGROUP 명령어를 날려도 해당 메세지는 읽을 수 없다.
이렇게 consumer name이 바뀐 메세지는 또 다른 스케줄러가 처리하도록 하였다. 좀 과한가? 싶지만 메세지 자체에 문제가 생기면 무한으로 돌 것 같아서 저렇게 했다. 지금 다시 보니 굳이 필요한가 싶기도 하다.
후기
내가 구상한 대로 구현은 했지만 사실 이게 과연 안정성 있게 잘 실행될까? 에 대한 의문은 있다. 비동기를 처음 다루기도 했고, 하나의 서버에서 처리되지 않을 수 있다는 생각에 중간에 어떤 문제가 발생하면 어떻게 해야 하나?라는 생각이 많이 들었다. 특히 네트워크나 각 서버 자체에서 문제가 생기고 재수가 없기 내가 보낸 메세지가 사라진다면 이런 문제는 어떻게 해결해야 하는지가 어려운 것 같다. 단순히 로그 파일을 잘 만들어서 이를 역추적해야 하나? 논리적으로 해결한 방법은 없나?라는 생각이 들었다. 사실 DB 자체도 정합성을 유지해 주는 기능들이 있지만 천재지변에는 완벽하게 대응하지는 못하니 이렇게 하는 게 최선인가?라는 생각도 들었다.
그래도 CS 스터디를 하며 멀티 스레드, 스레드 풀, 비동기에 대해 토론하며 학습한 부분들이 도움이 됐다. 완벽하지는 않지만 어느 정도 스레드 풀과 비동기에 대한 개념이 있었기에 이런 방식을 생각한 것 같고, 구현까지 하게 됐다고 생각한다. 1년 전만 해도 내 프로젝트들은 이런 생각들은 전혀 하지 않고 단순히 기능 구현만 했기 때문에 확실히 CS 스터디 한 것이 도움이 되었고, CS가 왜 중요한지 어느정도 체감이 된 것 같다.
처리하지 못한 부분도 있는데 Stream 데이터 삭제를 어느 주기로 해야 하는지?이다. XACK를 했을 때 Consumer Group에 있는 pending 데이터는 사라져도 실제 XADD를 한 데이터는 남아있기 때문이다. 결국 메모리를 전부 사용하지 않으려면 어느 정도 삭제를 해야 하는데 이게 시간도 꽤 걸리는 작업이고 하나씩 삭제하는 것은 또 비효율적이다. 메시지를 특정 개수만 남기고 전부 삭제하는 것도 있는데 해당 명령어가 도달하기 전에 트래픽이 증가하여 갑자기 메세지가 많이 생기면 이것도 문제가 발생할 것 같아 고민이다. Stream에 TTL을 추가할 수도 있는 것 같은데 이런 것 전부 고민해서 가장 괜찮을 것 같은 방법을 추가해야겠다.
조금 아쉬운 부분은 예외 처리를 한 부분인데 실무에서는 이걸 어떻게 처리하는지가 궁금하다. 스레드 풀에 대해 예외 핸들링을 했지만 여기서 뭔가 복구를 시키는지? 아니면 이건 논리적으로 해결하기보다는 서버 자체에 문제가 있는 것으로 보고 알림을 날린다던지? 나는 당장에 방법이 생각나지 않아 로그만 찍긴 했는데 나중에 고민해 보고 기똥찬 방법이 생각나지 않으면 메일 발송이라도 해봐야겠다.
참고
https://redis.io/docs/latest/commands/xack/
https://github.com/kingjakeu/springboot-redis-stream
https://lettuce.io/core/release/reference/
http://redisgate.kr/redis/clients/lettuce2_intro.php