Esta é uma POC com um cenário parcial do modelo final.
Exemplo de uso da documentação abaixo no Chat GPT: https://chatgpt.com/share/0060da84-e5eb-48d8-928a-958b6e7a6669
Configuração atual para canal de entrada com Spring Cloud Stream
Configuração da aplicação spring
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
gorup: <<nomeFila>>
consumer:
concurrency: <<valorConcurrency>>
Caso a exchange seja diferente de topic, o tipo da exchange é especificado em spring.cloud.stream.rabbit.bindings.<<nomeCanal>>.consumer.exchangeType conforme abaixo:
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
gorup: <<nomeFila>>
consumer:
concurrency: <<valorConcurrency>>
rabbit:
bindings:
<<nomeCanal>>:
consumer:
exchangeType: <<exchangeTypeValue>>
Sendo a exchange do tipo headers, o binding conforme os atributos do header da mensagem é especificado em spring.cloud.stream.rabbit.bindings.<<nomeCanal>>.consumer.queueBindingArguments conforme abaixo.
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
group: <<nomeFila>>
consumer:
concurrency: <<valorConcurrency>>
rabbit:
bindings:
<<nomeCanal>>:
consumer:
exchangeType: headers
queueBindingArguments:
<<headerKey>>:<<headerValue>>
<<headerKey>>:<<headerValue>>
Na aplicação spring, para ler a configuração acima há um código conforme abaixo:
public interface Channel {
String INPUT= "<<nomeCanal>>";
@Input(INPUT)
SubscribableChannel input();
}
Este canal de input pode estar declarado junto a um canal de saída, na mesma interface, exemplo:
public interface Channel {
String INPUT= "<<nomeCanalEntrada>>";
String OUPUT= "<<nomeCanalSaida>>";
@Input(INPUT)
SubscribableChannel input();
@Output(OUPUT)
MessageChannel output();
}
Ainda na aplicação spring, para habilitar a configuração na aplicação e permitir que um método seja executado ao receber uma nova mensagem no canal de entrada há um código conforme abaixo:
@EnableBinding(Channel.class)
public class Subscriber {
@StreamListener(target = Channel.INPUT, condition = Mensagem.CONDITIONAL_EXPRESSION)
public void methodName(TOTVSMessage<Mensagem> message) {
}
}
Abaixo, exemplos concretos de um canal de entrada que correspondem as estruturas de código acima mencionadas:
Exemplo concreto da configuração, neste caso não há configuração para concurrency pois utiliza-se o valor padrão 1, também não é especificado a configuração referente a exchangeType pois o valor é topic (valor padrão).
spring:
cloud:
stream:
bindings:
coleta-entrega-input-events:
destination: coleta-entrega-exchange
gorup: ${spring.application.name}
Neste caso, o nome da fila é recebe o próprio nome da aplicação, exemplo ${spring.application.name}, contudo poderia ser qualquer outro nome de uma fila.
Exemplo concreto de uma aplicação spring para ler a configuração acima:
public interface COLENTChannel {
String INPUT_NAME = "coleta-entrega-input-events";
@Input(INPUT_NAME)
SubscribableChannel input();
}
Exemplo concreto de uma aplicação spring que habilita a configuração e configura o método uploadArquivo quando recebe a mensagem UploadArquivoCmd no canal coleta-entrega-input-events. O canal é identificado por COLENTChannel.INPUT_NAME e a condição para recebimento da mensagem é identificado por UploadArquivoCmd.CONDITIONAL_EXPRESSION que é igual a "headers['type']=="UploadArquivoCmd"
@AllArgsConstructor
@EnableBinding(COLENTChannel.class)
public class ArquivoSubscriber {
private final ArquivoApplicationService service;
private final VerificarTenantValidoService verificarTenantValidoService;
private final ArquivoRejeitadoApplicationService serviceRejeitada;
@StreamListener(target = COLENTChannel.INPUT_NAME, condition = UploadArquivoCmd.CONDITIONAL_EXPRESSION)
public void uploadArquivo(TOTVSMessage<UploadArquivoCmd> cmd) {
if (verificarTenantValidoService.tenantValido(cmd)) {
try {
var cmd = ArquivoCmdAssembler.toCommand(message.getContent());
service.handle(cmd);
}
catch (Exception exception) {
serviceRejeitada.handle(ArquivoCmdAssembler.toCommand(cmd, exception));
}
}
}
}
Configuração atual de canal de saída com Spring Cloud Stream
Configuração da aplicação spring
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
rabbit:
bindings:
<<nomeCanal>>:
producer:
transacted: true
Caso a exchange seja diferente de topic, o tipo é especificado em spring.cloud.stream.rabbit.bindings.<<nomeCanal>>.consumer.exchangeType conforme abaixo:
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
rabbit:
bindings:
<<nomeCanal>>:
producer:
exchangeType: <<exchangeTypeValue>>
transacted: true
Na aplicação spring, para ler a configuração acima há um código conforme abaixo:
public interface Channel {
String OUTPUT = "<<nomeCanal>>";
@Ouput(OUPUT)
MessageChannel ouput();
}
O canal de saída também pode estar declarado junto a um canal de entrada, na mesma interface, exemplo:
public interface Channel {
String INPUT= "<<nomeCanalEntrada>>";
String OUPUT= "<<nomeCanalSaida>>";
@Input(INPUT)
SubscribableChannel input();
@Output(OUPUT)
MessageChannel output();
}
Ainda na aplicação spring, para habilitar a configuração na aplicação e permitir que uma mensagem seja postada em um canal de saída há um código conforme abaixo:
@AllArgsConstructor
@EnableBinding(Channel.class)
public class Publisher {
private Channel channel;
private TransactionContext transactionContext;
public <T> void publish(T message) {
var messageName = event.getClass().getSimpleName();
var message = new TOTVSMessage<T>( messageName, transactionContext.getTransactionInfo());
message.setContent(message);
message.sendTo(channel.output());
}
}
Abaixo, exemplos concretos de um canal de saída que correspondem as estruturas de código acima mencionadas:
Exemplo concreto da configuração, neste caso não há configuração para exchangeType pois o valor é topic (valor padrão).
spring:
cloud:
stream:
bindings:
coleta-entrega-ouput-events:
destination: coleta-entrega-exchange
rabbit:
bindings:
coleta-entrega-ouput-events:
producer:
transacted: true
Exemplo concreto de uma aplicação spring que lê a configuração acima
public interface COLENTChannel {
String OUPUT_NAME = "coleta-entrega-ouput-events";
@OUTPUT(OUPUT_NAME)
MessageChannel ouput();
}
Exemplo concreto para habilitar a configuração na aplicação sprint e permitir que uma mensagem seja postada em um canal de saída:
@AllArgsConstructor
@EnableBinding(COLENTChannel.class)
public class Publisher {
private COLENTPublisher channel;
private TransactionContext transactionContext;
public <T> void publish(T message) {
var messageName = event.getClass().getSimpleName();
var message = new TOTVSMessage<T>(messageName, transactionContext.getTransactionInfo());
message.setContent(message);
message.sendTo(channel.output());
}
}
Configuração atual para canal de tratamento erro com Spring Cloud Stream
cloud:
stream:
pollable-source: <<canalDeErro>>
bindings:
<<canalDeErro>>-in-0:
destination: <<exchangeErro>>
group: <<filaErro>>
<<canalDeErro>>-out-0:
destination: <<exchangeErro>>
group: <<filaErro>>
producer:
requiredGroups: <<filaErro>>
Exemplo concreto de configuração de canal para tratamento de erro
cloud:
stream:
pollable-source: tjf-messaging-error
bindings:
tjf-messaging-error-in-0:
destination: ${spring.application.name}-errors
group: ${spring.application.name}-errors
tjf-messaging-error-out-0:
destination: ${spring.application.name}-errors
group: ${spring.application.name}-errors
producer:
requiredGroups: ${spring.application.name}-errors
Configuração da aplicação spring
tjf:
messaging:
amqp:
exchanges:
topicExchanges:
- name: <<nomeExchange>>
durable: true
autoDelete: false
queues:
queuesList:
- name: <<nomeExchange>>.<<nomeFila>>
durable: true
autoDelete: false
bindings:
bindingsList:
- destination: <<nomeExchange>>.<<nomeFila>>
exchange: <<nomeExchange>>
destinationType: queue
Para os casos com concurrency maior que 1, a configuração é realizada diretamente no código do método conforme será visto mais adiante.
Sendo a exchange do tipo headers, a declaração se dá através de headersExchange ao invés de topicExchanges e o binding conforme os atributos do header da mensagem é especificado em tjf.messaging.amqp.bindings.bindingsList.arguments conforme abaixo:
tjf:
messaging:
amqp:
exchanges:
headersExchanges:
- name: <<nomeExchange>>
durable: true
autoDelete: false
queues:
queuesList:
- name: <<nomeExchange>>.<<nomeFila>>
durable: true
autoDelete: false
bindings:
bindingsList:
- destination: <<nomeExchange>>.<<nomeFila>>
exchange: <<nomeExchange>>
destinationType: queue
arguments:
<<headerKey>>:<<headerValue>>
<<headerKey>>:<<headerValue>>
Na aplicação spring, diferente de quando utilizado Spring Cloud Stream, não há mais necessidade da declaração dos canais em métodos anotados com @Input em uma interface que representa o canal.
Para permitir que um método seja executado ao receber uma nova mensagem no canal de entrada há um código conforme abaixo:
@AllArgsConstructor
@Component
@RabbitListener(queues = QUEUE)
public class Subscriber {
private static final QUEUE = "nomeFila"
@RabbitHandler
public void methodName(@Payload final Mensagem msg) {
}
}
Abaixo, exemplos concretos de um canal de entrada que correspondem as estruturas de código acima mencionadas:
Exemplo concreto da configuração quando o tipo da exchange é topic.
tjf:
messaging:
amqp:
exchanges:
topicExchanges:
- name: coleta-entrega-exchange
durable: true
autoDelete: false
queues:
queuesList:
- name: coleta-entrega-exchange.${spring.application.name}
durable: true
autoDelete: false
bindings:
bindingsList:
- destination: coleta-entrega-exchange.${spring.application.name}
exchange: coleta-entrega-exchange
destinationType: queue
Neste caso, o nome da fila é recebe o próprio nome da aplicação, exemplo ${spring.application.name}, contudo poderia ser qualquer outro nome de uma fila.
Diferente de quando é Spring Cloud Stream, não há necessidade de declarar uma interface com métodos anotados com @Input.
Exemplo concreto de uma aplicação spring que configura o método uploadArquivo para receber a mensagem UploadArquivoCmd da fila coleta-entrega-exchange.${spring.application.name} .
@AllArgsConstructor
@RabbitListener(queues = QUEUE)
public class ArquivoSubscriber {
private static final String QUEUE = "coleta-entrega-exchange.${spring.application.name}"
private final ArquivoApplicationService service;
private final VerificarTenantValidoService verificarTenantValidoService;
private final ArquivoRejeitadoApplicationService serviceRejeitada;
@RabbitHandler
public void uploadArquivo(@Payload UploadArquivoCmd cmd) {
if (verificarTenantValidoService.tenantValido(cmd)) {
try {
var appCmd = ArquivoCmdAssembler.toCommand(cmd);
service.handle(appCmd);
} catch (Exception exception) {
serviceRejeitada.handle(ArquivoCmdAssembler.toCommand(cmd, exception));
}
}
}
}
Configuração da aplicação spring
tjf:
messaging:
amqp:
exchanges:
topicExchanges:
- name: <<nomeExchange>>
durable: true
autoDelete: false
Sendo a exchange do tipo headers, a declaração se dá através de headersExchange ao invés de topicExchanges. Caso já exista a mesma declaração por conta do canal de entrada, não há necessidade de repetir.
tjf:
messaging:
amqp:
exchanges:
headersExchanges:
- name: <<nomeExchange>>
durable: true
autoDelete: false
Ainda na aplicação spring, para habilitar a configuração na aplicação e permitir que um método seja executado ao receber uma nova mensagem no canal de entrada há um código conforme abaixo:
@AllArgsConstructor
@Component
public class Publisher {
private static final String EXCHANGE = "nomeExchange"
private TransactionContext transactionContext;
private RabbitTemplate rabbitTemplate;
public <T> void publish(T message) {
var messageName = event.getClass().getSimpleName();
message = TOTVSMessageBuilder.<T>
.withDefaultType()
.setContent(message)
.setTransactionInfo(transactionInfo)
.buildAmqp();
rabbitTemplate.convertAndSend(EXCHANGE, null, message , null);
}
Abaixo, exemplos concretos de um canal de saída que correspondem as estruturas de código acima mencionadas:
Exemplo concreto da configuração quando o tipo da exchange é topic.
tjf:
messaging:
amqp:
exchanges:
topicExchanges:
- name: coleta-entrega-exchange
durable: true
autoDelete: false
Diferente de quando é Spring Cloud Stream, não há necessidade de declarar uma interface com métodos anotados com @Ouput.
Exemplo concreto de uma aplicação spring com um publicador que publica uma mensagem em uma exchange.
@AllArgsConstructor
@Component
public class Publisher {
private static final String EXCHANGE = "coleta-entrega-exchange"
private TransactionContext transactionContext;
private RabbitTemplate rabbitTemplate;
public <T> void publish(T message) {
var messageName = event.getClass().getSimpleName();
message = TOTVSMessageBuilder.<T>
.withDefaultType()
.setContent(message)
.setTransactionInfo(transactionInfo)
.buildAmqp();
rabbitTemplate.convertAndSend(EXCHANGE, null, message , null);
}
Migrando o canal de tratamento de erro para SpringAMQP
tjf:
messaging:
amqp:
error:
exchange: <<exchangeErro>>
exchanges:
topicExchanges:
- name: <<exchangeErro>>
durable: true
autoDelete: false
queues:
queuesList:
- name: <<exchangeErro>>.<<filaErro>>
durable: true
autoDelete: false
exclusive: false
queue-dlq: <<filaErro>>
bindings:
bindingsList:
- destination: <<filaErro>>
exchange: <<exchangeErro>>
destinationType: queue
Exemplo concreto de uma configuração de exchange e fila para tratamento de erro utilizando Spring AMQP
tjf:
messaging:
amqp:
error:
exchange: coleta-entrega-core-errors
exchanges:
topicExchanges:
- name: coleta-entrega-core-errors
durable: true
autoDelete: false
queues:
queuesList:
- name: coleta-entrega-core-errors.coleta-entrega-core-errors
durable: true
autoDelete: false
exclusive: false
queue-dlq: coleta-entrega-core-errors.coleta-entrega-core-errors
bindings:
bindingsList:
- destination: coleta-entrega-core-errors.coleta-entrega-core-errors
exchange: coleta-entrega-core-errors
destinationType: queue
----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
DE/PARA FINAL
DE:
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
gorup: <<nomeFila>>
consumer:
concurrency: <<valorConcurrency>>
PARA:
tjf:
messaging:
amqp:
exchanges:
topicExchanges:
- name: <<nomeExchange>>
durable: true
autoDelete: false
queues:
queuesList:
- name: <<nomeExchange>>.<<nomeFila>>
durable: true
autoDelete: false
bindings:
bindingsList:
- destination: <<nomeExchange>>.<<nomeFila>>
exchange: <<nomeExchange>>
destinationType: queue
DE:
spring:
cloud:
stream:
bindings:
<<nomeCanal>>:
destination: <<nomeExchange>>
group: <<nomeFila>>
consumer:
concurrency: <<valorConcurrency>>
rabbit:
bindings:
<<nomeCanal>>:
consumer:
exchangeType: headers
queueBindingArguments:
<<headerKey>>:<<headerValue>>
<<headerKey>>:<<headerValue>>
PARA:
tjf:
messaging:
amqp:
exchanges:
headersExchanges:
- name: <<nomeExchange>>
durable: true
autoDelete: false
queues:
queuesList:
- name: <<nomeExchange>>.<<nomeFila>>
durable: true
autoDelete: false
bindings:
bindingsList:
- destination: <<nomeExchange>>.<<nomeFila>>
exchange: <<nomeExchange>>
destinationType: queue
arguments:
<<headerKey>>:<<headerValue>>
<<headerKey>>:<<headerValue>>
DE:
public interface Channel {
String INPUT= "<<nomeCanal>>";
@Input(INPUT)
SubscribableChannel input();
}
PARA:
Remover, não se aplica mais...
DE:
@EnableBinding(Channel.class)
public class Subscriber {
@StreamListener(target = Channel.INPUT, condition = Mensagem.CONDITIONAL_EXPRESSION)
public void methodName(TOTVSMessage<Mensagem> message) {
}
}
PARA:
@AllArgsConstructor
@Component
@RabbitListener(queues = QUEUE)
public class Subscriber {
private static final QUEUE = "nomeFila"
@RabbitHandler
public void methodName(@Payload final Mensagem msg) {
}
}
DE:
spring:
cloud:
stream:
bindings:
coleta-entrega-ouput-events:
destination: coleta-entrega-exchange
rabbit:
bindings:
coleta-entrega-ouput-events:
producer:
transacted: true
PARA:
tjf:
messaging:
amqp:
exchanges:
topicExchanges:
- name: coleta-entrega-exchange
durable: true
autoDelete: false
DE:
public interface COLENTChannel {
String OUPUT_NAME = "coleta-entrega-ouput-events";
@OUTPUT(OUPUT_NAME)
MessageChannel ouput();
}
PARA:
Remover, não se aplica mais...
DE:
@AllArgsConstructor
@EnableBinding(Channel.class)
public class Publisher {
private Channel channel;
private TransactionContext transactionContext;
public <T> void publish(T message) {
var messageName = event.getClass().getSimpleName();
var message = new TOTVSMessage<T>( messageName, transactionContext.getTransactionInfo());
message.setContent(message);
message.sendTo(channel.output());
}
}
PARA:
@AllArgsConstructor
@Component
public class Publisher {
private static final String EXCHANGE = "nomeExchange"
private TransactionContext transactionContext;
private RabbitTemplate rabbitTemplate;
public <T> void publish(T message) {
var messageName = event.getClass().getSimpleName();
message = TOTVSMessageBuilder.<T>
.withDefaultType()
.setContent(message)
.setTransactionInfo(transactionInfo)
.buildAmqp();
rabbitTemplate.convertAndSend(EXCHANGE, null, message , null);
}
DE:
cloud:
stream:
pollable-source: <<canalDeErro>>
bindings:
<<canalDeErro>>-in-0:
destination: <<exchangeErro>>
group: <<filaErro>>
<<canalDeErro>>-out-0:
destination: <<exchangeErro>>
group: <<filaErro>>
producer:
requiredGroups: <<filaErro>>
PARA:
tjf:
messaging:
amqp:
error:
exchange: <<exchangeErro>>
exchanges:
topicExchanges:
- name: <<exchangeErro>>
durable: true
autoDelete: false
queues:
queuesList:
- name: <<exchangeErro>>.<<filaErro>>
durable: true
autoDelete: false
exclusive: false
queue-dlq: <<filaErro>>
bindings:
bindingsList:
- destination: <<filaErro>>
exchange: <<exchangeErro>>
destinationType: queue