Log4KJS
RabbitMQ 의 Retry 설정 본문
MessageListener
x-death, x-message-ttl, x-dead-letter-exchange, x-dead-letter-routing-key
RabbitMQ 는 메세지가 reject (nack) 될 때마다 헤더의 x-death 를 증가시킵니다.
큐에 x-message-ttl, x-dead-letter-exchange,routing-key 를 설정하면, ttl 보다 많이 reject 된 메세지를
dead-letter-queue 로 보낼 수 있습니다.
defaultRequeueRejected
처리중 예외가 발생한 경우 메세지를 requeue 할지 여부를 설정 할 수 있습니다.
또, 리스너에서 AmqpRejectAndDontRequeueException, ImmediateRequeueException 등의 예외를 던지면 해당 옵션에 상관 없이 메시지의 requeue 여부를 정할 수 있습니다.
RetryInterceptor
Advice 설정시에 호출하는 Listener 에 Advice 가 붙습니다.
(MessageListenerContainer -> (invoke inside transaction) -> MessageListener)
(인터셉터는 트랜잭션 내부 listener invocation 에 붙습니다. = defaultRequeueRejected 가 더 앞단에 있음, StatelessRetry 로 리스너 내부 오류를 감추면 requeue 되지 않음)
StatelessRetryInterceptor -> listener 에서 예외를 뱉으면 스택 내에서 해당 메세지 처리를 n회 재시도합니다.
StatefulRetryInterceptor -> 예외를 그대로 던지고, 기본적으로 힙에 해당 메세지 Id 를 키로 RetryState 를 저장합니다. (따라서 RabbitTemplate 에 설정하는 MessageConverter 에 createMessageId 를 true 로 설정해야 합니다.)
예외가 던져지면, 트랜잭션도 롤백되고, defaultRequeueRejected 가 true 일 경우 메세지도 requeue 됩니다.
MessageRecoverer -> 재시도가 일정 횟수를 넘은 경우 recoverer 가 호출됩니다. RejectAndDontRequeueRecoverer 을 통해 재시도 횟수를 초과한 메세지를 무시하거나, RepublishMessageRecoverer 을 통해 재시도 횟수를 초과한 메세지를 특정 exchange, queue 로 전달할 수 있습니다. recoverer 호출시 ImmediateAcknowledgeAmqpException 이 던져져 브로커에 즉시 ack 가 전달됩니다.
ErrorHandler
MessageListener 에서 처리중 예외가 던져지면, errorHandler 가 호출됩니다.
(ErrorHandler 호출 후에도 해당 에러는 다시 throw 됩니다)
기본 ErrorHandler 인 ConditionalRejectingErrorHandler 는 복구 불가능(Fatal) 한 일부 예외들 (설정 가능) 에 대해서 AmqpRejectAndDontRequeueException 을 던져 requeue 를 막습니다. Fatal 한 예외 + x-death 헤더가 존재 (한번 이상 reject 되었음) 에 대해서 requeue 를 하지 않고 ack 를 보냅니다.
Batching
Batch 인 경우 적절한 KeyGenerator 를 지정해 주지 않으면 RetryContext 에 대한 키가 모호하기 때문에 StatefulRetryInterceptor 를 사용하면 안될 듯 합니다.
재시도 설정시 BatchListener 에서는 BatchMessageRecoverer 를 사용해야 합니다. 에러 발생시 nack (defaultRequeueRejected 포함) 는 모두 정상적으로 작동합니다.
if (messageRecoverer == null) {
logger.warn("Message(s) dropped on recovery: " + arg, cause);
} else if (arg instanceof Message) {
messageRecoverer.recover((Message)arg, cause);
} else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) {
((MessageBatchRecoverer)messageRecoverer).recover((List)arg, cause);
}
throw new ImmediateAcknowledgeAmqpException("Recovered message forces ack (if ack mode requires it): " + arg, cause);
'Message Queue' 카테고리의 다른 글
RabbitMQ 정리 - 2 ( 클라이언트 구조 ) (1) | 2021.03.15 |
---|---|
RabbitMQ 정리 - 1 ( Use Case ) (3) | 2021.03.15 |