0. 들어가기 전
이번에는 MSA 환경에서의 트랜잭션에 대해서 생각해보겠습니다.
기존의 모놀리식 구조에서는 각 도메인 뿐만 아니라 도메인을 영속화하는 DB까지 하나의 애플리케이션에 존재했습니다.
따라서, 별도의 설정을 하지 않으면 하나의 요청을 처리할 때 하나의 트랜잭션으로 묶여서 처리가 되게 됩니다.
Member DB Write -> Board DB Write가 발생하는 비즈니스 로직 실행 시 Board DB Write에 장애가 발생하면 어떻게 될까요?
일반적으로 하나의 트랜잭션으로 묶이기 때문에 Member DB의 Write도 커밋이 아닌 롤백이 될 것입니다.
그렇다면, 분산 환경인 MSA 환경에서 위와 같은 상황이 발생하면 어떻게 될까요?
DB가 분리되어 있기 때문에 물리적 트랜잭션이 Member DB와 Board DB가 같을 수 없을 것입니다.
그렇다면, 이러한 상황에서 트랜잭션의 특성 중 하나인 원자성은 어떻게 보장할 수 있을까요?
- 원자성(Atomicity)
- 트랜잭션의 연산은 DB에 모두 반영되거나 모두 반영되지 않아야한다.
- 트랜잭션 내의 모든 명령은 반드시 수행되어야 하며, 하나라도 수행되지 않으면 트랜잭션이 취소되어야 한다.
이제부터 알아보도록 하겠습니다.
1. 비즈니스 로직 추가
그 전에, 현재 서비스에 하나의 요청에 2개 DB의 Write가 발생하지 않아서 트랜잭션 처리를 경험하기 위해서
다음과 같은 비즈니스 로직을 추가해줬습니다.
- 사용자가 게시글을 작성하면 사용자의 포인트가 500점 적립된다.
그리고 이를 위해서, Member 테이블에 'WritePoint' 컬럼을 추가하고 게시글 작성 시 500점씩 증가하도록 처리했습니다.
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Member extends BaseEntity {
...
private Long writePoint;
...
public void write() {
this.writePoint += 500L;
}
}
그래서 게시글 작성 시 다음과 같이 2개의 DB에서 Write가 발생하도록 했습니다.
- Board DB : 게시글 생성 Write (Create)
- Member DB : 게시글 포인트 적립 Write (Update)
이러한 상황에서, 어떻게 트랜잭션의 원자성을 보장해야할지 알아봅시다.
분산 트랜잭션을 관리하는 방안으로는 크게 다음과 같은 패턴 2가지가 존재합니다.
- 2Phase Commit 패턴(2PC 패턴)
- Saga 패턴
이 2가지 패턴을 알아보고 애플리케이션에 적절한 패턴을 적용하도록 하겠습니다.
2. 2Phase Commit 패턴(2PC 패턴)
2PC 패턴은 분산 환경에서 트랜잭션의 원자성을 보장하기 위한 패턴으로, 자세한 정의는 다음과 같습니다.
트랜잭션 처리, 데이터베이스, 컴퓨터 네트워크에서 2단계 커밋 프로토콜(two-phase commit protocol, 2PC)은
원자적 커밋 프로토콜(ACP)의 일종이다.
트랜잭션을 커밋할지, 아니면 롤백할지에 대해 분산 원자적 트랜잭션에 관여하는 분산 알고리즘의 하나이다.
2PC 패턴은 크게 다음과 같은 2가지 특징이 존재합니다.
- 분산 트랜잭션을 관리하는 주체인 'Coordinator'가 있다.
- 이름에서도 알 수 있듯이 2가지 Phase가 존재한다.
- Prepare Phase
- Commit Phase
따라서, Coordinator와 2가지 Phase를 기반으로 제 프로젝트 서비스에 적용한 예시로 원리를 알아봅시다.
제 프로젝트에서 '게시글 작성' 요청이 오면 Board DB Write, Member DB Write가 발생합니다.
여기서 2PC 패턴을 적용한 모습을 살펴봅시다.
분산 트랜잭션 처리 순서는 Prepare Phase -> Commit Phase 순으로 진행됩니다.
Prepare Phase
Prepare Phase에서는 게시글 작성과 관련한 DB에 Coodinator가 상태 질의를 통해 현재 상태를 판단합니다.
- 1. Coodinator가 게시글 작성과 관련한 DB에게 상태 질의
- Board DB에 Board Row 생성 가능 여부 질의 & 해당 Row에 Lock 설정
- Member DB에 해당하는 Member Row 수정 가능 여부 질의 & 해당 Row에 Lock 설정
- 2. 각 DB에서 Coodinator에게 응답
- 각 DB에서 요청에 대해 처리 가능한지의 여부를 Coodinator에게 응답합니다.
여기서 중요한 점은, 요청 처리 가능을 Coodinator에게 응답했을 때 바로 Commit 되지 않는 점이 중요합니다.
Coodinator는 관련한 모든 서비스들의 응답을 듣고 판단해야 하기 때문에, 각 서비스의 응답이 왔을 때 각 서비스에 Commit이 아닌
관련한 모든 서비스의 응답이 왔을 때 해당 요청을 Commit 할지, Rollback 할지를 판단합니다.
그렇기 때문에 Prepare Phase에서는 해당하는 DB에 관련한 모든 서비스의 응답이 올 때까지 Lock을 설정합니다.
Commit Phase
Commit Phase에서는 게시글 작성과 관련한 DB의 요청 처리 가능 여부를 통해 Commit or Rollback을 판단합니다.
- 1. 게시글 작성과 관련한 DB의 요청 처리 가능 여부를 통해 Commit or Rollback을 판단합니다.
- 하나의 DB라도 요청을 처리할 수 없다는 응답이 오면, 모든 요청을 Rollback 합니다.
- 모든 DB에서 요청을 처리할 수 있다는 응답이 오면, 모든 요청을 Commit 합니다.
이때 중요한 점은 Rollback이든 Commit이든 관련 DB에 요청이 동시에 처리되지 않는다는 점입니다.
그리고 모든 DB 요청이 처리될 때까지 해당하는 Row에 Lock이 설정되어 있다는 점이 중요합니다.
2PC 패턴의 문제점
앞서 2PC 패턴을 사용해서 어떻게 MSA의 분산 트랜잭션을 처리할 수 있는지 살펴봤습니다.
하지만, 2PC 패턴을 MSA 환경에서 적용하기에는 다음과 같은 문제점들이 많이 존재합니다.
- 모든 요청을 처리할 때까지 관련한 모든 DB에 Lock이 설정된다.
- 따라서, 지연 시간(Latency)가 증가할 수 있다.
- 지금은 관련 DB가 2개지만, 비즈니스가 복잡해져서 관련 서비스가 늘어난다면 관련 서비스 수와 비례해서 지연 시간이 길어질 수 있다.
- 서비스 간 강결합을 초래한다.
- MSA 구조를 도입하는 이유는 각 서비스 간의 결합을 줄이고 독립적인 서비스를 구축하기 위함입니다.
- 하지만 2PC를 적용하면 Coodinator를 기반으로 서비스 간의 강력한 결합이 생기기 때문에 MSA 구조를 사용하는 의미가 퇴색될 수 있습니다.
- 추가적으로, NoSQL은 2PC 패턴을 지원하지 않습니다.
이러한 문제점들로 인해서 MSA 환경에서 분산 트랜잭션 처리 방안으로 2PC 패턴은 잘 사용하지 않는다고 합니다.
3. Saga 패턴
Saga 패턴은 2PC 패턴의 문제점을 보완해서 해결할 수 있습니다.
Saga 패턴이 2PC 패턴과 다른점은 다음과 같습니다.
- 2PC 패턴 : 관련 모든 서비스들의 DB 트랜잭션을 동시에 처리한다. (이를 위해 순차적으로 관련 DB에 Lock 설정)
- Saga 패턴 : 관련 모든 서비스들의 DB 트랜잭션을 순차적으로 처리한다.
2PC 패턴의 문제점들은 모든 DB 트랜잭션을 동시에 처리하기 위해 DB에 Lock을 설정하여 성능 및 결합 문제가 있었습니다.
Saga 패턴은 동시에 처리하지 않고 우선 순차적으로 각 DB의 트랜잭션(로컬 트랜잭션)을 처리합니다.
Saga 패턴은 2PC 패턴의 특징들을 다음과 같은 방식으로 대체합니다.
- 모든 트랜잭션을 관리(상태 체크 & Commit/Rollback)했던 Coordinator : 서비스 간의 이벤트를 통해 로컬 트랜잭션을 순차적으로 처리
- 트랜잭션 상태 체크 시 처리 X면 전체 트랜잭션 롤백하여 원자성 보장 : '보상 트랜잭션'이라는 개념을 통해 처리
더 자세히 말해보면, Saga 패턴은 각 서비스간의 로컬 트랜잭션 이벤트를 통해서 트랜잭션을 처리합니다.
이 과정에서 메시지 브로커를 통해 이벤트를 Pub/Sub하게 됩니다.
또한 원자성 보장은 하나의 트랜잭션이 롤백되었을 때 이전 이벤트를 발행했던 트랜잭션들에게
'보상 이벤트'를 발행하여 트랜잭션을 롤백하는 '보상 트랜잭션'을 실행합니다.
※ 보상 트랜잭션이란?
저는 처음에 보상 트랜잭션으로 롤백을 한다고 들었을 때,
기존의 커밋한 트랜잭션(Board-Service의 게시글 생성 트랜잭션)을 물리적으로 롤백한다는 의미로 이해했습니다.
그래서 어떻게 하지? 고민을 했었는데 알고 보니 트랜잭션 자체를 롤백하는게 아니라 롤백한 것처럼 만드는 것이었습니다.
그래서 보상 트랜잭션이란, 물리적 개념이 아니라 롤백된 것처럼 비즈니스적으로 롤백 처리하는 것을 의미합니다.
Saga 패턴의 예시로 주문 & 결제 도메인을 많이 사용해서, 저도 해당 예시로 예를 들어보겠습니다.
주문 생성 -> 결제 완료 순으로 Flow가 있을 때 Saga 패턴을 적용한다고 해봅시다.
이때, 주문 생성 트랜잭션이 완료 후에 결제 완료 로직에서 장애가 발생했을 때 Saga 패턴에서는 보상 트랜잭션을 실행한다고 했습니다.
일반적으로 보상 트랜잭션의 구현은 'Status'를 추가해서 해당 Status를 변경하는 것으로 구현합니다.
그래서 OrderStatus를 생성하고 주문 생성 시에 SUCCESS 상태로 변경하고,
보상 트랜잭션 비즈니스 로직을 실행하면 OrderStatus를 CANCLED 같은 실패 상태로 변경합니다.
그래서 최종적으로 SUCCESS 상태의 주문만 이후에 처리하는 느낌으로 최종적으로 롤백된 것처럼 구현할 수 있게 됩니다.
Saga 패턴은 다음과 같은 2종류로 구분할 수 있습니다.
앞서 말한 Saga 패턴의 주요 특징은 같지만, 이벤트 및 보상 트랜잭션 처리의 주체에 따라 구분됩니다.
- Choreographed Saga : 이벤트 및 보상 트랜잭션 처리 주체가 각 마이크로 서비스
- Orchestrated Saga : 이벤트 및 보상 트랜잭션 처리의 주체로 'Orchestrator'가 존재하여 중앙에서 처리
이 2가지 종류의 Saga 패턴을 알아보도록 합시다.
3-1. Choreographed Saga
Choreographed Saga는 앞에서 다음과 같이 소개했습니다.
이벤트 및 보상 트랜잭션 처리 주체가 각 마이크로 서비스인 Saga 패턴
정확히는, 이벤트 및 보상 트랜잭션 처리 주체는 '각 마이크로 서비스의 메시지 브로커'입니다.
해당 처리를 중앙에서 처리하는 것이 아닌 각 마이크로 서비스의 메시지 브로커에서 이벤트를 Pub/Sub하면서 처리합니다.
Choreographed Saga Flow를 도식화하면 위와 같습니다.
- 1. 요청이 들어오면 Board Service에서 Board 생성 트랜잭션(Board 로컬 트랜잭션)을 실행합니다.
- 2. Message Broker에게 Board 생성 Event를 Publish합니다.
- 3. Board 생성 Event를 Subscribe하는 Member Service에서 Event를 Subscribe하여 Member 업데이트 트랜잭션(Member 로컬 트랜잭션)을 실행합니다.
- 4. Member 로컬 트랜잭션의 결과(Success/Fail)이벤트를 Message Broker에게 Publish합니다.
- if) Member 로컬 트랜잭션 성공 시
- 이때, 성공 이벤트는 필수가 아니라 선택적으로 적용할 수 있습니다.
- 만약, Member 포인트 업데이트와 상관없이 게시글 작성이 되어야 한다면 Member 로컬 트랜잭션 성공 이벤트를 발행하지 않고 Board 생성 트랜잭션이 완료되면 그대로 DB에 반영합니다.
- 만약, Member 포인트 업데이트가 된 후에 게시글 작성이 되어야 한다면 Member 로컬 트랜잭션 성공 이벤트를 발행하고, 해당 성공 이벤트를 Board Service에서 Subscribe 하기 전에는 Board 생성 트랜잭션의 내용을 DB에 반영하지 않고 기다리다가 성공 이벤트가 Subscribe되면 그때 DB에 반영합니다.
- if) Member 로컬 트랜잭션 실패 시
- 실패 이벤트는 보상 트랜잭션을 위해 필수적으로 Message Broker에게 Publish합니다.
- Member 로컬 트랜잭션 실패 이벤트를 Board Service에서 Subscribe하여 보상 트랜잭션을 실행합니다.
- 보상 트랜잭션으로 Board 로컬 트랜잭션 내용을 롤백 처리합니다.
- if) Member 로컬 트랜잭션 성공 시
이러한 Flow로 분산 트랜잭션에서 원자성을 보장할 수 있습니다.
이러한 Choreographed Saga의 장단점은 다음과 같습니다.
장점
- 마이크로 서비스가 적다면 쉽고 간단하게 구성이 가능하다.
- 기존 MSA 환경에서 추가적인 인프라 리소스가 필요하지 않다.
단점
- 마이크로 서비스가 많아진다면,
- 각 서비스 간의 이벤트 구조를 파악하기가 어렵고, 의존성 순환이 발생할 위험이 있다.
- 모니터링 시에 현재 이벤트 및 트랜잭션의 상태를 추적하기가 어렵다.
3-2. Orchestrated Saga
Orchestrated Saga는 앞에서 다음과 같이 소개했습니다.
이벤트 및 보상 트랜잭션 처리의 주체로 'Orchestrator'가 존재하여 중앙에서 처리
Orchestrated Saga는 앞서 본 Choreographed Saga와 달리 이벤트 및 보상 트랜잭션 처리 주체로 Orchestrator가 있습니다.
또 Choreographed Saga는 마이크로 서비스 간에 직접적으로 이벤트를 주고받았지만,
Orchestrated Saga는 Orchestrator를 통해 이벤트가 다른 서비스에게 전달됩니다.
- 1. 요청이 들어오면 Board Service에서 Board 생성 트랜잭션(Board 로컬 트랜잭션)을 실행합니다.
- 2. Orchestrator에게 완료 Event를 Publish하고, Orchestrator는 Subscribe합니다.
- 3. Orchestrator에서 설정한 이벤트 처리 순서에 따라 Member 포인트 업데이트 Event를 Publish합니다.
- 4. Member Service에서 Member 포인트 업데이트 Event를 Subscribe하여 Member 로컬 트랜잭션을 실행합니다.
- 4. Member 로컬 트랜잭션의 결과(Success/Fail)이벤트를 Orchestrator에게 Publish합니다.
- if) Member 로컬 트랜잭션 성공 시
- 이때, 성공 이벤트는 필수가 아니라 선택적으로 적용할 수 있습니다.
- 만약, Member 포인트 업데이트와 상관없이 게시글 작성이 되어야 한다면 Member 로컬 트랜잭션 성공 이벤트를 발행하지 않고 Board 생성 트랜잭션이 완료되면 그대로 DB에 반영합니다.
- 만약, Member 포인트 업데이트가 된 후에 게시글 작성이 되어야 한다면 Member 로컬 트랜잭션 성공 이벤트를 발행하고, Orchestrator에서 모두 성공했다는 이벤트를 Publish하여 해당 성공 이벤트를 Board Service에서 Subscribe 하기 전에는 Board 생성 트랜잭션의 내용을 DB에 반영하지 않고 기다리다가 성공 이벤트가 Subscribe되면 그때 DB에 반영합니다.
- if) Member 로컬 트랜잭션 실패 시
- 실패 이벤트는 보상 트랜잭션을 위해 필수적으로 Orchestrator에게 Publish합니다.
- Member 로컬 트랜잭션 실패 이벤트를 Orchestrator에서 Subscribe하여 보상 트랜잭션 이벤트를 Publish합니다.
- Board Service에서 보상 트랜잭션 이벤트를 Subscribe하여 Board 로컬 트랜잭션 내용을 롤백 처리합니다.
- if) Member 로컬 트랜잭션 성공 시
이렇듯 Orchestrated Saga는 Choreographed Saga와 유사하지만,
각 마이크로 서비스의 트랜잭션 이벤트를 다른 마이크로 서비스가 Subscribe하지 않고
Orchestrator가 Subscribe하여 관리한다는 큰 차이가 존재합니다.
이러한 Orchestrated Saga의 장단점은 다음과 같습니다.
장점
- 각 마이크로 서비스의 트랜잭션 이벤트들을 Orchestrator에서 처리하기 때문에
- 서비스가 추가되더라도 이벤트 구조를 파악하기 쉽다.
- 마이크로 서비스 간의 이벤트 순환 의존이 없다.
- 각 마이크로 서비스들은 다른 마이크로 서비스에 대해서 알 필요가 없어지기 때문에 결합이 적어진다.
- 트랜잭션 및 이벤트 처리가 Orchestrator에서 이루어지므로 현재 이벤트 및 트랜잭션 상태를 Orchestrator에서 쉽게 추적할 수 있다.
단점
- 중앙 관리 시스템인 Orchestrator 구현을 위해 추가적인 인프라 리소스가 필요하다.
- Orchestrator가 전체 Flow를 관리하기 때문에 단일 장애 지점(SPOF)가 되어 장애 발생 시 모든 서비스에 장애가 전파될 수 있다.
- Orchestrator 구현이 상대적으로 어렵다.
4. 개인 프로젝트 분산 트랜잭션 처리 적용 - Choreographed Saga
제 개인 프로젝트에 적용할 분산 트랜잭션 처리 방안은 Choreographed Saga를 선택했습니다.
우선, 2PC 패턴은 요청이 끝날 때까지 관련 DB에 Lock이 설정된다는 점에서 선택에서 제외하였고
Saga 패턴 중에서 Choreographed Saga와 Orchestrated Saga 패턴을 고민하게 되었습니다.
결론적으로 말하면, 다음과 같은 이유 때문에 Choreographed Saga를 선택했습니다.
- Choreographed Saga가 구현이 더 쉽다.
- 현재 규모가 작아서 Choreographed Saga를 적용해도
- 이벤트 및 트랜잭션 상태를 추적하기 쉽다.
- 순환 의존성이 발생할 위험이 적다.
- Orchestrated Saga는 추가적인 인프라 서버가 필요한데, 낭비로 느껴졌다.
이제부터 본격적으로 Choreographed Saga를 프로젝트에서 구현해보도록 하겠습니다!
Choreographed Saga에 사용되는 Message Broker로는 일반적으로 널리 사용되는 Kafka를 이용하겠습니다.
전체 Flow는 앞서 살펴봤던 아래의 그림과 같습니다.
코드 포스팅 순서는 트랜잭션 처리 Flow 순서대로 따라가보겠습니다.
(Kafka로 구현하는데, Kafka가 메인 주제가 아니니 자세하게 Kafka를 설명하지는 않겠습니다.)
4-1. Kafka 의존성 추가 & Producer/Consumer 설정
Member-Service와 Board-Service의 build.gradle에 Kafka 의존성을 추가해줍니다.
implementation 'org.springframework.kafka:spring-kafka'
다음은 Producer/Consumer 설정입니다.
설정은 application.yml로 설정해도 되지만, yml 파일로 설정 시 패키지 경로 등 세부적인 부분을 적어야하기 때문에
보기 편한 @Configuration 클래스로 설정을 해줬습니다.
Member-Service/Board-Service의 Producer 설정은 같고, Consumer 설정은 설정하는 Group ID만 다르고 나머지는 같기 때문에
하나로 포스팅하도록 하겠습니다.
(궁금하신 분들은 맨 아래의 깃허브 코드를 참고해주시면 될 것 같습니다.)
Kafka Producer Config
@Configuration
public class KafkaProducerConfig {
private static final String KAFKA_BROKER_URL = "localhost:9092";
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_URL);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- Kafka 브로커는 localhost:9092로 설정해줬습니다.
- 메시지의 Serializer는 StringSerializer로 설정했습니다.
Kafka Consumer Config
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private static final String KAFKA_BROKER_URL = "localhost:9092";
private static final String BOARD_TRANSACTION_RESULT_GROUP_ID = "board-transaction-result-group";
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_URL);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, BOARD_TRANSACTION_RESULT_GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- @EnableKafka를 선언해서 KafkaListner의 endpoint를 활성화했습니다.
- 메시지의 Deserializer는 StringDeserializer로 설정했습니다.
이후에 메시지를 주고받을 때 단순 String이 아니라 Dto 클래스로 주고받게 되는데,
메시지를 직렬화/역직렬화할 때 기본 Config의 Serializer/Deserializer를 Json 직렬화/역직렬화 클래스로 지정하여
Dto로 주고받아도 에러가 나지 않게끔 설정하려고 했었습니다.
그러나, 계속 직렬화/역직렬화 에러가 떠서 일단은 String으로 직렬화/역직렬화를 하고
이후에 나오겠지만 ObjectMapper를 통해 수동으로 설정하게 되었습니다.
4-2. Board 생성 Event Pub & Sub
Board/Member : WriteBoardMessage
public record WriteBoardMessage(Long createdBoardId, String loginId) {
}
Board 작성 시 Board에서 해당 메시지를 Pub하고 Member에서 Sub하여 사용할 Message Dto 클래스를
두 서비스에 모두 동일하게 생성했습니다.
- createdBoardId : 생성된 Board ID를 받아서 보상 트랜잭션 Event Pub & Sub 시 사용합니다.
- loginId : Member에서 Sub해서 포인트를 적립할 때 포인트 적립할 멤버를 가져오기 위해 loginId도 추가했습니다.
BoardKafkaProducer
@Component
@RequiredArgsConstructor
public class BoardKafkaProducer {
private static final String WRITE_BOARD_TOPIC = "write-board";
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
public void writeBoard(final Long createdBoardId, final String loginId) throws JsonProcessingException {
final WriteBoardMessage writeBoardMessage = new WriteBoardMessage(createdBoardId, loginId);
kafkaTemplate.send(WRITE_BOARD_TOPIC, objectMapper.writeValueAsString(writeBoardMessage));
}
}
- "write-board" 토픽으로 WriteBoardMessage를 Publish하는 로직을 작성했습니다.
BoardService
@Service
@Transactional
@RequiredArgsConstructor
public class BoardService {
private final BoardRepository boardRepository;
private final MemberInfoRepository memberInfoRepository;
private final MemberFeignClient memberFeignClient;
private final BoardKafkaProducer boardKafkaProducer;
...
public Long writeBoard(final String loginId, final BoardWriteRequest request) throws JsonProcessingException {
final MemberFeignResponse response = memberFeignClient.findMemberIdByLoginId(loginId);
final MemberInfo memberInfo = new MemberInfo(response.nickname());
final MemberInfo savedMemberInfo = memberInfoRepository.save(memberInfo);
final Board board = new Board(savedMemberInfo, request.title(), request.content());
final Board savedBoard = boardRepository.save(board);
final Long createdBoardId = savedBoard.getId();
boardKafkaProducer.writeBoard(createdBoardId, loginId);
return createdBoardId;
}
}
- BoardKafkaProducer를 의존성 주입받아서 사용했습니다.
- writeBoard 호출(게시글 작성) 시 앞서 작성한 Produce 로직이 실행되도록 했습니다.
MemberService
@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class MemberService {
private final MemberRepository memberRepository;
...
public void rewardWritePoint(final WriteBoardMessage message) {
final Member findMember = memberRepository.findByLoginId(message.loginId())
.orElseThrow(MemberException.NotFoundMemberException::new);
findMember.rewardWritePoint();
}
}
- 게시글 작성 Event를 Sub했을 때 실행할 포인트 적립 로직을 작성했습니다.
MemberKafkaConsumer
@Component
@RequiredArgsConstructor
public class MemberKafkaConsumer {
private final MemberService memberService;
private final TransactionKafkaProducer transactionKafkaProducer;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "write-board", groupId = "board-write-group")
public void consumeBoardWriteEvent(final String writeBoardMessage) throws JsonProcessingException {
final WriteBoardMessage message = objectMapper.readValue(writeBoardMessage, WriteBoardMessage.class);
try {
memberService.rewardWritePoint(message);
final MemberResponseMessage successMessage = new MemberResponseMessage(message.createdBoardId(), MemberStatus.REWARDED);
transactionKafkaProducer.sendTransactionResultMessage(successMessage);
} catch (Exception e) {
final MemberResponseMessage failMessage = new MemberResponseMessage(message.createdBoardId(), MemberStatus.NOT_REWARDED);
transactionKafkaProducer.sendTransactionResultMessage(failMessage);
}
}
}
Flow 완성 후 포스팅이라서 현재 보상 트랜잭션 Event의 Pub까지 껴있지만, 게시글 작성 Event의 Sub 로직만 살펴보겠습니다.
- @KafkaListener를 통해 "write-board"의 topic을 "board-write-group"이라는 Group ID로 Sub합니다.
- 받은 WriteBoardMessage를 통해 MemberService의 포인트 적립 로직을 실행합니다.
여기까지가 Board 생성 Event를 Pub & Sub하는 부분입니다.
이제 보상 트랜잭션 Event를 Pub & Sub하는 부분을 알아보도록 하겠습니다.
4-2. 보상 트랜잭션 Event Pub & Sub
이전 과정까지 진행했을 때, 최종 과정인 Member의 포인트 적립 트랜잭션이 제대로 완료됐는지 여부에 따라 보상 트랜잭션이 결정됩니다.
Board/Member : MemberResponseMessage
public record MemberResponseMessage(Long boardId, MemberStatus memberStatus) {
}
보상 트랜잭션 로직 실행 시 Member에서 해당 메시지를 Pub하고 Board에서 Sub하여 사용할 Message Dto 클래스를
두 서비스에 모두 동일하게 생성했습니다.
- boardId : 관련 게시글 ID(생성된 Board ID)를 받아서 이후 Board에서 게시글 보상 트랜잭션 로직을 실행하기 위해 사용합니다.
- MemberStatus : Board에서 Message를 Sub했을 때 멤버 포인트 적립 로직이 성공인지 실패인지 여부로 보상 트랜잭션 로직을 분기처리하기 위해 사용합니다.
Member : TransactionKafkaProducer
@Component
@RequiredArgsConstructor
public class TransactionKafkaProducer {
private static final String WRITE_BOARD_TRANSACTION_RESULT_TOPIC = "write-board-transaction-result";
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
public void sendTransactionResultMessage(final MemberResponseMessage memberResponseMessage) throws JsonProcessingException {
final String message = objectMapper.writeValueAsString(memberResponseMessage);
kafkaTemplate.send(WRITE_BOARD_TRANSACTION_RESULT_TOPIC, message);
}
}
- 보상 트랜잭션 Event를 Pub할 로직을 작성했습니다.
Member : MemberKafkaConsumer
@Component
@RequiredArgsConstructor
public class MemberKafkaConsumer {
private final MemberService memberService;
private final TransactionKafkaProducer transactionKafkaProducer;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "write-board", groupId = "board-write-group")
public void consumeBoardWriteEvent(final String writeBoardMessage) throws JsonProcessingException {
final WriteBoardMessage message = objectMapper.readValue(writeBoardMessage, WriteBoardMessage.class);
try {
memberService.rewardWritePoint(message);
final MemberResponseMessage successMessage = new MemberResponseMessage(message.createdBoardId(), MemberStatus.REWARDED);
transactionKafkaProducer.sendTransactionResultMessage(successMessage);
} catch (Exception e) {
final MemberResponseMessage failMessage = new MemberResponseMessage(message.createdBoardId(), MemberStatus.NOT_REWARDED);
transactionKafkaProducer.sendTransactionResultMessage(failMessage);
}
}
}
앞서 Board 작성 Event를 Sub 했던 로직 이후에 보상 트랜잭션 Event가 Pub되도록 작성했습니다.
포인트 적립 로직을 실행하는 부분을 try-catch로 잡았습니다.
- try : 예외가 발생하지 않으면 성공이므로 MemberStatus를 REWARDED로 설정하여 Message를 구성하여 Pub했습니다.
- catch : 예외가 발생하면 실패이므로 MemberStatus를 NOT_REWARDED로 설정하여 Message를 구성하여 Pub했습니다.
Board : TransactionKafkaConsumer
@Component
@RequiredArgsConstructor
public class TransactionKafkaConsumer {
private final BoardRepository boardRepository;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "write-board-transaction-result", groupId = "board-transaction-result-group")
public void consumeBoardWriteTransactionResultEvent(final String memberResponseMessage) throws JsonProcessingException {
final MemberResponseMessage message = objectMapper.readValue(memberResponseMessage, MemberResponseMessage.class);
if (message.memberStatus() == MemberStatus.NOT_REWARDED) {
boardRepository.deleteById(message.boardId());
}
}
}
- @KafkaListener로 "write-board-transaction-result" topic을 "board-transaction-result-group"이라는 Group ID로 Sub합니다.
- Message의 MemberStatus가 NOT_REWARDED인 경우(실패한 경우)만 보상 트랜잭션 비즈니스 로직을 실행합니다.
- 저는 포인트 적립이 되지 않은 게시글은 삭제한다는 비즈니스 요구사항을 추가하여 보상 트랜잭션 비즈니스 로직으로 생성했던 게시글을 다시 삭제하는 로직을 구현했습니다. (게시글의 상태 변경 보다는 삭제하는 게 나을 것 같다고 생각했습니다.)
이상으로, Choreographed Saga 패턴의 간단한 구현을 마치겠습니다.
전체 코드가 궁금하신 분은 아래의 깃허브를 참고해주시면 감사하겠습니다.
🎯 Github Repository 링크 (전체 코드)
https://github.com/sh111-coder/sh-board-msa
📘 Monolithic to MSA 전체 목차
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (1) MSA란?
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (2) 멀티 모듈 구성하기
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (3) Service Discovery 패턴 적용하기(feat. Spring Cloud Eureka)
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (4) API Gateway 구현(feat. Spring Cloud Gateway)
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (5) 서비스 간 통신하기(feat.Spring Cloud OpenFeign)
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (6) 각 서비스의 설정 파일 관리하기(feat. Spring Cloud Config)
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (7) 서비스 장애 대응 Circuit Breaker 구현(feat. Resilience4J)
[MSA] 개인 프로젝트 Monolithic to MSA 전환기 - (10) MSA 전환 후 비교 및 회고 + 마무리
Reference
https://waspro.tistory.com/734