Mini Pay step 2 간단 설명
step 1을 끝내고 step 2로 넘어왔다. step 2는 다른 사람의 메인 계좌에 이체를 하는 것과 매일 00시에 이체 한도를 초기화 하는 것이었다. 개인적으로 페이 서비스의 step 중에서 가장 고민을 많이 했으며 가장 재밌어 보이는 step 이었다.
step 2에서 고민해야 할 점은 사용자들이 동시에 요청을 보냈을 때 돈의 정합성 문제와 다수의 요청을 동시에 효율적이게 처리해야 할 방법이었다.
효율적이고 정확한 처리를 위한 고민
먼저 돈과 관련되어 있기 때문에 정합성이 가장 중요했다. 왜냐하면 내가 이체를 했는데 나의 계좌에는 돈이 빠져나갔지만, 상대는 돈이 들어오지 않은 상황이 발생하면 안되기 때문이다. 그래서 잠깐 생각을 해 봐도 A→B 라는 돈의 흐름에서 A의 돈 차감, B의 돈 증가는 한 트랜잭션에 있어야 한다고 생각을 했다.
이제 효율적인 처리를 위한 고민을 해보면 위의 A→B 작업이 한 트랜잭션에 있으면 효율적이지 않다는 것이 보인다. 이는 A의 레코드를 수정하기 위해 락을 걸게 되고 이는 B의 레코드 수정까지 락을 들고 있게 된다. A와는 상관 없는 B의 처리를 위해 락을 불필요하게 들고 있는 것이다.
정합성과 효율성을 따로 놓고 보면 정합성이 당연히 중요하지만, 많은 사용자가 사용했을 때 둘 다 문제가 없게 할 새로운 방안이 필요했다. 당장 든 생각은 B의 경우 돈을 받는 입장이라 무조건 돈의 이동이 발생해도 양수가 나온다. 그렇기에 굳이 B는 select를 한 후에 save() 메소드를 통한 업데이트가 필요없었다. 그래서 바로 update문을 사용하는 쿼리를 사용하면 좋다고 생각했다.
하지만 이렇게 되어도 정작 중요한 A의 락은 B의 처리까지 가져가고 있었다. 결국 A와 B의 작업을 분리 해야 락을 효율적으로 관리 할 수 있다고 생각했다. 그래서 A는 A대로, B는 B대로 처리하고 B는 새로운 스레드 풀로 작업을 처리하면 좋다는 생각이 들었다. 우리도 친구가 이체를 했을 때 그 즉시 확인하면 돈이 입급되지 않았고, 새로고침을 하면 들어온 것을 확인한 경험이 있을 것이다. 그래서 서비스 자체에선 이 두 로직을 분리했을 때 문제는 없다고 생각했다.
A와 B의 작업을 분리하여 락은 효율적으로 관리했지만 이제 정합성 문제가 남았다. 만약 A가 이체를 위해 자신의 잔고에서 일정 금액을 차감하고 B의 작업을 처리할 스레드 풀에 작업 요청을 할 것이다. 이후 스레드 풀이 큐에서 작업을 꺼내서 처리할 것인데, 이때 문제가 생겨 서버가 다운되었다고 가정하자. 그럼 A는 이체를 했지만 B는 입금 내역이 없는 상황이 발생할 것이다.
나는 이 문제를 DB 테이블에 이체 내역을 기록해서 해결하기로 결정했다. 별도의 테이블에 A→B가 이체한다는 기록과 해당 기록이 B가 처리했는지 체크하는 컬럼을 하나 두는 것이다. 그럼 A의 트랜잭션에서는 A의 돈 차감과 함께 이체 내역이 테이블에 기록될 것이다. 이는 한 트랜잭션 안에 있기 때문에 중간에 문제가 생기면 A의 잔액이 차감되지 않고 이체 내역도 기록되지 않을 것이다. 이후 스레드 풀이 B를 처리할 때, B의 잔액을 증가한 후 A 트랜잭션에서 기록된 이체 내역을 처리했다고 체크하는 것이다. 그리고 어떤 백그라운드 스레드를 통해 지속적으로 이체 내역을 확인하여 제대로 이체되지 않은 것은 스레드 풀에 넣어주는 것이다.
스레드 풀의 작업을 처리할 클래스(DepositHandlerService)
package org.c4marathon.assignment.bankaccount.service;
import org.c4marathon.assignment.bankaccount.repository.MainAccountRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class DepositHandlerService {
private final MainAccountRepository mainAccountRepository;
@Async("depositExecutor")
public void doDeposit(long accountPk, long money) {
mainAccountRepository.deposit(accountPk, money);
}
public void submit(long accountPk, long money) {
executor.submit(() -> doDeposit(accountPk, money));
}
}
- 내가 처음에 작성한 클래스이다. A의 트랜잭션에서 submit()을 호출하면 submit()이 executor.submit()을 호출하여 비동기로 처리할 수 있다고 생각했다.
- @Async에서는 내가 생성한 스레드 풀의 빈의 이름을 넣어줬다. 해당 스레드 풀을 사용해서 작업을 처리하기 위함이다.
- 이는 대단한 착각이었는데 @Async의 경우 같은 클래스 내에서 호출하면 내 예상처럼 비동기적으로 처리되지 않았다.
- 그리고 이것보다 먼저 발견한 문제가 있었는데 A의 트랜잭션이 해당 클래스까지 이어지는 것이었다. 그래서 submit()을 10번 호출해도 스레드 풀에는 작업이 10개가 잘 들어가지만, 해당 스레드 풀에서 작업은 단 1번만 발생하는 문제가 있었다. 즉 update는 1번만 실행되는 것이었다.
- 이 문제는 로그를 보고 해결할 수 있었는데 트랜잭션이 shared된다는 JPA의 로그를 볼 수 있었다. 그래서 트랜잭션을 별도로 분리하고 submit()을 통해 비동기 처리 작업을 처리하는 것이 아닌, A의 트랜잭션 작업에서 doDeposit()을 바로 호출하는 것으로 변경했다.
package org.c4marathon.assignment.bankaccount.service;
import org.c4marathon.assignment.bankaccount.repository.MainAccountRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class DepositHandlerService {
private final ThreadPoolTaskExecutor executor;
private final MainAccountRepository mainAccountRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Async("depositExecutor")
public void doDeposit(long accountPk, long money) {
mainAccountRepository.deposit(accountPk, money);
}
}
- DepositHandlerService는 위와 같이 수정하였다.
- 위 문제를 해결하며 알게된 점은 내가 비동기로 처리해도 같은 트랜잭션이 확장되면 정상적으로 처리되지 않는다는 점이었다. 트랜잭션을 별도로 분리하며 A의 이체 작업이 doDeposit() 메소드까지 영향을 미치지 않도록 해야 한다. 또한 같은 클래스 내에서 @Async가 붙은 메소드를 호출하면 별도의 스레드에서 실행되지 않는다는 것이다. 이는 스프링 AOP가 프록시 기반으로 동작하기 때문이다. 이 부분은 추가로 공부가 필요하다.
- 현재 내가 이해한 바로는 @Transactional도 이게 붙은 메소드는 클래스 전체를 프록시로 등록한다. @Async도 프록시로 동작하면 같은 원리로 클래스 전체가 프록시가 빈으로 등록될 것이다. 그렇기 때문에 내부에서 다른 메소드를 호출하면 @Transactional이 적용되지 않는 것처럼 @Async도 적용이 되지 않는 것이다. 이건 AOP가 프록시에만 적용되고 프록시에서 @Async가 적용되지 않는 메소드를 통해 클래스 내부의 @Async가 붙은 메소드를 호출하면 프록시가 적용되지 않은 클래스의 객체를 호출하게 된다. 그럼 해당 클래스의 메소드는 @Async가 적용되지 않은 채로 메모리에 올라와 있기 때문에 비동기적으로 메소드가 실행되지 않은 것이다.
이체하는 사용자의 트랜잭션
...
@Transactional(isolation = Isolation.READ_COMMITTED)
public void sendToOtherAccount(long myAccountPk, long otherAccountPk, long money) {
// 1. 나의 계좌에서 이체할 금액을 빼준다.
MainAccount myAccount = mainAccountRepository.findByPkForUpdate(myAccountPk)
.orElseThrow(AccountErrorCode.ACCOUNT_NOT_FOUND::accountException);
autoMoneyChange(myAccount, money);
mainAccountRepository.save(myAccount);
// 2. 이체 로그를 남겨준다.
// 3. 입금 로직은 다른 스레드 풀에게 넘기고 트랜잭션을 종료한다.
for (int i = 0; i < 10; i++) {
depositHandlerService.doDeposit(otherAccountPk, 1000);
}
}
...
- 1번에서는 A의 돈을 차감하는 로직이 실행된다.
- 2는 아직 완성하지 않았다. 아마 다음 글에 적용될 듯 하다.
- 3은 비동기로 잘 적용되는지 확인하기 위해 10번 반복을 통해 호출했다.
- 실제로 스레드가 1번부터 순서대로 실행되는 것이 아닌, 제각각으로 실행되는 것을 볼 수 있다.
테스트 하기
package org.c4marathon.assignment.bankaccount.concurrency;
import org.c4marathon.assignment.bankaccount.dto.response.MainAccountResponseDto;
import org.c4marathon.assignment.bankaccount.service.MainAccountService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class JustTest {
@Autowired
MainAccountService mainAccountService;
@Test
void threadPoolTest() {
MainAccountResponseDto mainAccountInfo = mainAccountService.getMainAccountInfo(4);
mainAccountService.sendToOtherAccount(1, 4, 1000);
try {
Thread.sleep(2000);
} catch (Exception e) {
System.out.println("error");
}
MainAccountResponseDto newMainAccountInfo = mainAccountService.getMainAccountInfo(4);
Assertions.assertEquals(mainAccountInfo.money() + 1000 * 10, newMainAccountInfo.money());
}
}
- 비동기로 처리되기 때문에 바로 mainAccountService.getMainAccountInfo()를 다시 호출하면 원하는 값이 나오지 않는다. 그래서 2초간 쉬었다가 실행하도록 했다. 그러니 당연히 예상대로 1000*10만큼의 돈이 적용된 것을 확인할 수 있었다.
- 그럼 바로 호출하면 어떻게 될까? 당연히 값이 같지 않아야 할 것이다.
package org.c4marathon.assignment.bankaccount.concurrency;
import org.c4marathon.assignment.bankaccount.dto.response.MainAccountResponseDto;
import org.c4marathon.assignment.bankaccount.service.MainAccountService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class JustTest {
@Autowired
MainAccountService mainAccountService;
@Test
void threadPoolTest() {
MainAccountResponseDto mainAccountInfo = mainAccountService.getMainAccountInfo(4);
mainAccountService.sendToOtherAccount(1, 4, 1000);
MainAccountResponseDto newMainAccountInfo = mainAccountService.getMainAccountInfo(4);
Assertions.assertEquals(mainAccountInfo.money() + 1000 * 10, newMainAccountInfo.money());
}
}
- 근데 문제가 생겼다. 어차피 같은 계좌에 계속 트랜잭션을 처리하니 getMainAccountInfo()도 락이 걸려 있기에 순차적으로 진행하는 것이다. 그래서 비동기로 잘 동작하는지 정확하게 확인하려면 @Async가 붙은 메소드에서 Thread.sleep()을 호출하면 될 것이다.
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Async("depositExecutor")
public void doDeposit(long accountPk, long money) {
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println("error");
}
mainAccountRepository.deposit(accountPk, money);
}
- 이렇게 db를 호출 전에 1초씩 쉬어버리면 이 메소드를 비동기로 호출했기 때문에 doDeposit()을 호출한 메소드는 이미 응답을 완료했을 것이고 그 사이에 getMainAccountInfo()로 계좌의 정보를 호출하면 1000*10원 만큼의 돈이 완벽히 이동하지 않을 것이다.
- 결과는 1000원밖에 적용이 되지 않았다. 내 생각보다 훨씬 빠르게 각 메소드들이 적용되는 것 같다. 2000원은 이동할 줄 알았더니..
스레드 풀의 처리 속도보다 요청 속도가 빠르면 문제가 생기지 않을까?
내가 생각한 문제는 아직 처리되지 않은 이체 작업이 또 한 번 요청될 수 있다는 점이다. 이런 일이 발생한다면 이체 한 번에 입금은 두 번이 발생하는 기적이 발생할 수 있을 것이다. 쿼리를 변경해서 처리되지 않은 것만 해결하도록 하고 만약 로그 변경 쿼리가 아무것도 변경하지 못해서 0을 리턴한다면 예외를 발생시켜 트랜잭션을 롤백하는 방법을 사용하면 두 번 입금되는 문제가 해결될 것이라고 생각했다.
현재 로그 테이블을 업데이트 하는 쿼리는 아래와 같다.
@Modifying
@Query("""
update SendRecord sr
set sr.completion = true
where sr.recordPk = :recordPk
""")
int checkRecord(@Param("recordPk") long recordPk);
위의 쿼리가 문제가 되는 부분은 조건 절에 completion이 false인지 확인하지 않은 것이다. false가 아직 처리되지 않음을 뜻하는데 해당 조건이 없다면, 이미 true라도 해당 쿼리가 실행되니 변경이 되었다고 1을 리턴할 것이다.
그래서 아래와 같이 조건을 추가해 주었다.
@Modifying
@Query("""
update SendRecord sr
set sr.completion = true
where sr.recordPk = :recordPk and sr.completion = false
""")
int checkRecord(@Param("recordPk") long recordPk);
다음은 스레드 풀에서 처리하는 로직이다.
package org.c4marathon.assignment.bankaccount.service;
import org.c4marathon.assignment.bankaccount.repository.MainAccountRepository;
import org.c4marathon.assignment.bankaccount.repository.SendRecordRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class DepositHandlerService {
private final MainAccountRepository mainAccountRepository;
private final SendRecordRepository sendRecordRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
@Async("depositExecutor")
public void doDeposit(long accountPk, long money, long recordPk) {
mainAccountRepository.deposit(accountPk, money);
sendRecordRepository.checkRecord(recordPk);
}
}
만약 위의 로직대로라면, 쿼리를 변경하기 전에는 로그가 이미 true임에도 쿼리가 문제 없이 수행될 것이기 때문에 추가로 입금이 되는 입금 2배 이벤트가 발생할 것이다.
우선 내 가정을 확인하기 위해 Thread.sleep()메소드를 통해 스레드 풀에 같은 작업을 2개 넣고 텀을 두어 실행하도록 했다.
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
@Async("depositExecutor")
public void doDeposit(long accountPk, long money, long recordPk) {
mainAccountRepository.deposit(accountPk, money);
try {
Thread.sleep(5000);
} catch (Exception e) {
System.out.println("error!");
}
int changeCount = sendRecordRepository.checkRecord(recordPk);
System.out.println("changeCount = " + changeCount);
}
@Scheduled(fixedDelay = 60000)
private void sendRecordSchedule() {
List<SendRecord> nonCompletedDeposit = sendRecordRepository.findNonCompletedDeposit();
nonCompletedDeposit.stream()
.forEach(sendRecord -> depositHandlerService.doDeposit(sendRecord.getDepositPk(), sendRecord.getMoney(),
sendRecord.getRecordPk()));
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println("input error!");
}
nonCompletedDeposit.stream()
.forEach(sendRecord -> depositHandlerService.doDeposit(sendRecord.getDepositPk(), sendRecord.getMoney(),
sendRecord.getRecordPk()));
}
스케줄러 코드는 같은 작업을 한 번 더 넣어준 것이다. 그럼 doDeposit()을 실행할 때 로그를 수정하기 전에 둘 다 대기할 것이고 각각의 수정 쿼리는 락이 걸리기 때문에 일관성 있게 실행될 것이다.
실행 전에 db에 completion이 false인 로그 데이터를 임의로 넣어주고 서버를 실행해보자. 1분 뒤에 스케줄러가 해당 레코드를 읽어서 2번 처리할 것이다.
결과는 예상대로 먼저 스레드 풀에 있던 작업이 completion을 변경하여 1을 리턴하였고, 그 다음 작업은 이미 변경되어서 조건에 해당하는 레코드가 없었기 때문에 0을 리턴한 것을 볼 수 있다.
그럼 반대로 조건절에 completion = false를 넣지 않으면 둘 다 1이 나올 것이다. 기존의 쿼리에 completion = false 조건만 제거하고 재실행 해보았다. 결과는 아래와 같다.
예상대로 2개 모두 update쿼리가 아주 잘 작동하여 changeCount 값이 1이 나온 것을 확인할 수 있다.
이후 아래와 같이 changeAccount가 1일 때만 이체가 발생하도록 변경해 주었다.
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
@Async("depositExecutor")
public void doDeposit(long accountPk, long money, long recordPk) {
int changeCount = sendRecordRepository.checkRecord(recordPk);
if (changeCount == 1) {
mainAccountRepository.deposit(accountPk, money);
}
}
최근 cs 공부에 집중했는데 다방면으로 문제를 생각할 수 있는데 도움이 된 것 같다. 특히 기존에는 DB 레코드의 락은 신경도 쓰지 않았고, 스레드 풀을 추가로 사용할 생각도 하지 못했었다. 물론 아직도 많이 부족해서 더 다양한 문제들에 대해서는 생각하지 못했을 수 있지만 뭔가 발전한 것 같은 느낌이 들었다. 그리고 이번 플젝을 하면서 문제가 발생했을 때 해결하는 것이 아닌, 발생할 수 있는 문제들을 고민해보고 미리 해결책을 만드는 것이 문제 해결 능력을 키우는 것이 아닐까?라는 생각이 들었다.
조금 아쉬운 점은 정합성을 위해 완료되지 않은 이체 기록을 DB에 썼는데, 쓰기 작업이 많이 생기니까 결국엔 생각보다 효율적이지 않은 작업이 될 수 있지 않을까?라는 생각이 들었다. 조만간 돈을 모아서 서버를 잠깐 사서 부하 테스트를 해봐야겠다..