Introdução
Esta página tem como objetivo ajudar os times da suíte logística a migrarem o framework para publicação e consumo de mensagens do RabbitMQ das suas aplicações, de Spring Cloud Stream para Spring AMQP. Esta migração é necessária pois a anotação StreamListener do Spring Cloud Stream foi depreciada/descontinuada e a alternativa oferecida exigiria uma mudança grande e de difícil transição.
Após vários estudos, foi desenvolvido um plano para permitir que os times realizem a migração de uma forma suavizada, considerando as seguintes premissas:
Premissas
- Não haverá perda de mensagens durante o processo de migração;
- Não haverá mensagens duplicadas por conta do processo de migração;
- Exceto para eventos recebidos do RAC que no momento de troca de pods pode duplicar mensagem, contudo, atualmente a duplicação de mensagem neste ponto já é conhecida e tratada.
- A migração poderá ser feita aos poucos, por mensagem específica, por um conjunto de mensagens ou de todas as mensagens desde que respeitado os procedimentos detalhados a seguir;
- Exceto os casos de manter a utilização das filas que já apontam para exchanges do tipo header e possuem 'n' suscribers
- Não quebrará a integração com outros produtos da suíte logística, nem com a infra estrutura de webhooks utilizado para integração com específicos e produtos externos;
- Ao fim da migração, o time estará com a mensageria pronta para futura migração do TJF 4.
Informações relevantes
Reorganização das exchanges e filas.
Atualmente, boa parte dos serviços que publicam mensagens o fazem através de uma exchange do tipo TOPIC. Para receber mensagens de uma exchange, boa parte dos serviços o fazem através de um único bind com uma fila que representa o serviço (exemplo agendamento-exchange.yms-core). Esta abordagem é bastante simples, contudo é muito sensível a altos volumes de mensagens de tipos específico. Exemplo, o serviço recebe mensagens de vários tipos, ClienteCriadoEvent, FornecedorCriadoEvent, MotoristaCriadoEvent entre outros. Em um determinado momento, passa a receber um alto volume da mensagem ClienteCriadoEvent, como há somente uma única fila, as demais mensagens se misturam na fila esperando para serem consumidas. Conclusão, a alta carga de mensagens de um único tipo acaba impactando o desempenho de todo o serviço.
Para endereçar esta situação, vamos criar novos exchanges do tipo HEADER, estes exchanges serão utilizados no lugar das exchanges do tipo TOPIC. Exchanges do tipo HEADER ofereçam uma maior flexibilidade na forma de realizar bind para filas em relação as exchanges do tipo TOPIC. Uma vez que tenhamos as exchanges do tipo HEADER, ao invés de criarmos uma única fila para todo o serviço, criaremos novas filas com base em cada agregado do serviço ou alguma outra separação que o serviço julgar interessante para evitar que eventos específicos impactem o funcionamento geral do serviço. Desta forma, o filtro para entregar mensagens de uma exchange para uma fila passa a ficar no RabbitMQ.
Quando não há necessidade de reorganização das exchanges e filas
Para os serviços que já possuem exchanges do tipo HEADER e filas organizadas conforme algum critério previamente estruturado e que esteja atendendo as necessidades atuais, boa parte desta documentação é irrelevante bastando seguir os passos abaixo:
- Redeclarar as exchanges e filas conforme novo modelo
- Importante! Há dois formatos de gerenciamento de filas de erros, a nativa do RabbitMQ e a do TJF, utilizar a do TJF. A nativa requer a recriação da fila.
- Ajustar os publicadores para fazer uso do RabbitTemplate.
- Ajustar os consumidores substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler
- Os passos mencionados acima estão nesta documentação. Atentar-se também para as limitações mencionadas abaixo
Limitações
- Se a fila já faz bind com uma exchange do tipo HEADER, deseja-se manter a fila existente e há várias classes do tipo subscriber que apontam para a mesma fila
- Será necessário realizar o processo de migração de todos subscribers da mesma fila de uma única vez
- Isso porque todos os métodos anotados com @RabbitHandler de uma mesma fila precisam estar na mesma classe.
- Será necessário realizar o processo de migração de todos subscribers da mesma fila de uma única vez
- Quando há um mesmo evento com dois subscribers
- Para estes casos, será necessário
Criar duas filas diferentes com o mesmo filtro por type. Exemplo, há dois subscribers para um evento ProcessoFinalizadoEvent onde cada subscriber corresponde a um agregado diferente. Neste caso, deverá ser criada uma fila para cada agregado.
Criar uma etapa após o ProcessoFinalizadoEvent que será responsável por chamar os diferentes comandos para cada agregado. Nesta abordagem também seria necessário disponibilizar os comandos via mensageria. Exemplo:
Outra possibilidade é chamar diretamente os comandos de aplicação em um único subscriber, contudo esta opção altera o formato de execução atual, fazendo com que o segundo comando de aplicação só seja executado após o sucesso do primeiro. Sendo assim, tenha consciência de que não será gerado um impacto indesejado ao produto.
- Para estes casos, será necessário
- Quando há necessidade de realizar um filtro com base em informações do corpo da mensagem
Neste caso o filtro que era feito através do parâmetro condition da anotação StreamListener terá que ser movido para o corpo do método através de uma cláusula condicional (if). Exemplo:
Plano de migração para exchanges do tipo TOPIC
Visão geral dos procedimentos
Será criada uma nova exchange do tipo HEADER para cada exchange TOPIC, adicionando um bind de exchange para exchange, partindo da TOPIC para HEADER. Assim conseguimos continuar publicando na exchange original, sem quebrar as integrações existentes e, criar as novas filas já com bind para a exchange do tipo HEADER.
Antes de criar as novas filas e subscribers com Spring AMQP, temos que pensar no cenário de deploy da alteração, quando tiver pods com versões do serviço com o subscriber antigo e o novo no ar.
Se removermos os subscribers antigos e adicionarmos o novo no mesmo deploy, corremos risco de perder mensagens na fila antiga, pois o pod antigo pode ser encerrado antes de terminar de drenar as mensagens da fila. Nesse mesmo cenário, pode ocorrer duplicação de mensagens se o pod antigo ficar no ar por algum tempo depois que o novo subiu, a mesma mensagem vai chegar na fila velha e na nova e ambas serão processadas.
Para resolver isso, sem alterar nada na fila antiga, foi necessário criar um mecanismo de versionamento das mensagens que resumidamente consiste em: Quando uma mensagem for recebida com um header indicando que é versão 2 (sl-version=2), ela será processada pelo Spring AMQP (novo), do contrário, será processado pelo Spring Cloud Stream (antigo). Com essas tratativas no lugar, podemos subir os dois subscribers ao mesmo tempo, e após isso, alteramos os publicadores para incluir o header indicando versão 2 e assim as mensagens passam a ser processadas somente pelo Spring AMQP (novo). Após alterar o publicador, o subscriber antigo pode ser removido do produto.
Depois de migrar todos os eventos, incluindo integrações externas como por exemplo webhooks, deve-se alterar os publicadores para publicar na exchange HEADER diretamente. Em seguida, remover o bind entre as exchanges do tipo TOPIC e HEADER. Esta última etapa não está coberta neste documento.
A seção abaixo ilustra o passo a passo.
Passo a passo dos procedimentos
Atualmente
Exchanges do tipo TOPIC e serviços com sua respectiva fila recebendo todos os tipos de eventos.
Primeira etapa do processo de expansão
Adicionar dependência ao sdk de migração que verifica se deve ou não adicionar header de versão (sl-version). Além do header, os interceptors nos subscribers também são adicionados para garantir que somente um dos métodos seja executado (com StreamListener ou com RabbitListener). Neste momento só existirão no código os métodos com StreamListener. Como o sl-version ainda não será enviado, o método com StreamListener continuará sendo executado em todos os casos sempre.
Clique aqui para verificar o passo a passo desta etapa.
Segunda etapa do processo de expansão
Nova exchange do tipo HEADER, bind entre as exchanges do tipo TOPIC e HEADER e duplicação do(s) subscribers, substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler.
Para os novos subscribers do RAC, TPD e outros sistemas externos que não controlamos a publicação de mensagens, adicionar anotação @IgnoreMessageInterceptor para que os interceptors criados para migração na sdk sejam ignorados.
Caso isso não seja observado, mensagens serão perdidas.
Clique aqui para verificar o passo a passo desta etapa.
Terceira etapa do processo de expansão
Adicionar a anotação @VersionedEvent nos eventos para que sejam publicados com o header sl-version=2.
TODOS os consumidores (em qualquer serviço) dos eventos a serem anotados com @VersionedEvent devem ter ao menos duplicado seus subscribers (segunda etapa do processo de expansão)
Caso isso não seja observado, mensagens serão perdidas.
Clique aqui para verificar o passo a passo desta etapa.
Primeira etapa do processo de contração
Remover os subscribers anotados com @StreamListener
TODOS os publicadores dos subscribers anotados com @StreamListener a serem removidos devem estar publicando o cabeçalho sl-version = 2 (terceira etapa de expansão)
Caso isso não seja observado, mensagens serão perdidas.
Clique aqui para verificar o passo a passo desta etapa.
Segunda etapa do processo de contração
Desabilitar os interceptors.
TODOS subscribers anotados com @StreamListener já devem ter sido removidos (primeira etapa do processo de contração)
Caso isso não seja observado, mensagens serão perdidas.
Clique aqui para verificar o passo a passo desta etapa.
Terceira etapa do processo de contração
Remover a anotação @VersionedEvent dos eventos.
TODOS os consumidores (em todos serviços) dos eventos com anotação @VersionedEvent removido já devem ter removido os subscribers antigos (segunda etapa do processo de contração)
Caso isso não seja observado, mensagens serão perdidas.
Clique aqui para verificar o passo a passo desta etapa.
Passo a passo técnico
1 - Dependências
Alterar a versão do TJF para 3.24.0-SNAPSHOT.
- Esta versão SNAPSHOT será convertida em RELEASE, processo em andamento com TJF Luciano De Araujo
<parent> <groupId>com.totvs.tjf</groupId> <artifactId>tjf-boot-starter</artifactId> <version>3.24.0-SNAPSHOT</version> </parent>
Adicionar nova dependência do TJF para tratamento de mensagens
<dependency> <groupId>com.totvs.tjf</groupId> <artifactId>tjf-messaging-amqp</artifactId> </dependency> <dependency> <groupId>com.totvs.tjf</groupId> <artifactId>tjf-test-amqp</artifactId> <scope>test</scope> </dependency>
Adicionar dependência do SDK de migração com infraestrutura para controlar publicação e recepção das mensagens
O sdk disponibiliza uma função para verificar se deve ou não adicionar o header sl-version com valor 2. Também disponibiliza a anotação @VersionedEvent para indicar se o evento deve ser publicado com sl-version com valor 2.
Essa infraestrutura é composta por interceptors que vão atuar nos métodos anotados com @StreamListener e @RabbitHandler. Antes de entrar no método, será verificado o parâmetro sl-version no cabeçalho da mensagem, se ele estiver vazio ou menor que 2, o método que contém @StreamListener será executado, se tiver 2 ou mais, será executado o @RabbitHandler.
Deve-se adicionar a dependência da sdk de migração no pom.xml
<repository> <id>SuiteLogistica</id> <url>https://pkgs.dev.azure.com/totvstfs/SuiteLogistica/_packaging/SuiteLogistica/maven/v1</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <dependency> <groupId>com.totvs.sl.migracao.amqp.sdk</groupId> <artifactId>migracao.amqp.sdk</artifactId> <version>1.0.7-RELEASE</version> </dependency>
Alterar publicadores
1.1 - Registrar exchanges no application.yml
Obs.: Registrar todos os exchanges existentes no serviço conforme novo modelo.
Exemplo:
tjf: messaging: amqp: exchanges: topicExchanges: #Existentes do tipo topic - name: coleta-entrega-exchange durable: true autoDelete: false - name: coleta-entrega-query-errors durable: true autoDelete: false headersExchanges: #Existentes do tipo headers - name: coleta-entrega-header-exchange durable: true autoDelete: false
1.2 - Alterar dispatchers
Novo formato de plublicação de mensagens.
Exemplo de Antes:
protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) { TOTVSMessageBuilder .withType(contentToBeDispatched.getClass().getSimpleName()) .setTransactionInfo(transactionInfo) .setContent(contentToBeDispatched) .build() .sendTo(getOutput()); }
Exemplo de Depois:
//Injetar RabbitTemplate template na classe do publicador // Além do RabbitTemplate, precisará do nome do exchnage e do routing key caso utilizado para poder usar convertAndSend protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) { var totvsMessage = buildAmqp(contentToBeDispatched, transactionInfo); var processor = MessageProcessor.getInstance(contentToBeDispatched); // usa sdk para identificar se deve ou não adicionar o header sl-version template.convertAndSend(exchange, ROUTING_KEY, totvsMessage, processor); } private <T> AmqpTOTVSMessage<Object> buildAmqp(T contentToBeDispatched, TransactionInfo transactionInfo) { return TOTVSMessageBuilder .withType(contentToBeDispatched.getClass().getSimpleName()) .setContent(contentToBeDispatched) .setTransactionInfo(transactionInfo) .buildAmqp(); }
Importante
Deve-se manter o mesmo formato de mensagem publicada pelo serviço atualmente. Ou seja, se utiliza CloudEvents, deve-se manter. Caso não utilize CloudEvents, deve-se manter o formato legado.
Apesar das modificações acima, o envio de mensagens deve continuar com o mesmo comportamento. Nesta etapa do processo, esta é apenas uma etapa de preparação.
Importante
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!
!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!!
2 - Criar nova exchange, novas filas e subscribers
Novo exchange do tipo HEADER
No caso das exchanges TOPIC, deverá ser criada uma exchange HEADER e será feito o bind entre elas, conforme mencionado anteriormente.
tjf: messaging: amqp: error: #NOVO, ADICIONAR exchange: coleta-entrega-query-errors # Aponta no tjf a exchange de erros - dlq migration: #NOVO, ADICIONAR interceptors: enabled: true # Habilita o uso dos interceptors durante o processo de migração exchanges: topicExchanges: - name: coleta-entrega-exchange durable: true autoDelete: false - name: coleta-entrega-query-errors durable: true autoDelete: false headersExchanges: - name: coleta-entrega-header-exchange durable: true autoDelete: false bindings: #NOVO, ADICIONAR bindingsList: - destination: coleta-entrega-header-exchange #NOVA EXCHANGE DO TIPO HEADER exchange: coleta-entrega-exchange #EXCHANGE EXISTENTE DO TIPO TOPIC routingKey: "#" destinationType: exchange
Novas filas
Neste exemplo novas filas serão criadas:
tjf: messaging: amqp: error: exchange: coleta-entrega-query-errors migration: interceptors: enabled: true exchanges: topicExchanges: - name: coleta-entrega-exchange durable: true autoDelete: false - name: coleta-entrega-query-errors durable: true autoDelete: false headersExchanges: - name: coleta-entrega-header-exchange durable: true autoDelete: false queues: #NOVO, ADICIONAR queuesList: - name: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Cria a nova queue do domínio de arquivo - V2 durable: true autoDelete: false - name: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Cria a nova queue do domínio de unidade - V2 durable: true autoDelete: false - name: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Cria a nova queue do domínio de organizacao - V2 durable: true autoDelete: false - name: coleta-entrega-header-exchange.sl-coleta-entrega.video # Cria a nova queue do domínio de video - V2 durable: true autoDelete: false - name: coleta-entrega-query-errors.coleta-entrega-query-errors # Cria a nova queue de erros - dlq durable: true autoDelete: false exclusive: false queue-dlq: coleta-entrega-query-errors.coleta-entrega-query-errors # Define a queue de erros como dlq no tjf bindings: #ATUALIZADO, ALTERAR bindingsList: - destination: coleta-entrega-header-exchange exchange: coleta-entrega-exchange routingKey: "#" destinationType: exchange - destination: coleta-entrega-query-errors.coleta-entrega-query-errors # Novo, faz o binding da fila de erros - dlq exchange: coleta-entrega-query-errors destinationType: queue - destination: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Novo, faz o binding dos eventos de arquivo exchange: coleta-entrega-header-exchange destinationType: queue arguments: type: [ArquivoCriadoEvent, ArquivoRemovidoEvent] #UMA DAS POSSIVEIS FORMAS DE DECLARAR UMA LISTA DE EVENTOS - destination: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Novo, faz o binding do evento de unidade exchange: coleta-entrega-header-exchange destinationType: queue arguments: type: UnidadeCriadaEvent - destination: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Novo, faz o binding do evento de organizacao exchange: coleta-entrega-header-exchange destinationType: queue arguments: type: OrganizacaoCriadaEvent - destination: coleta-entrega-header-exchange.sl-coleta-entrega.video # Novo, faz o binding dos eventos de video exchange: coleta-entrega-header-exchange destinationType: queue arguments: type: # OUTRA FORMA DE DECLARAR UMA LISTA DE EVENTOS - VideoHowToCriadoEvent - VideoHowToRemovidoEvent
Novos subscribers
Primeiramente deve-se fazer uma cópia da classe subscriber original e nomear a atual como old.
- Trocar as anotações @StreamListener por @RabbitHandler
- Adicionar @RabbitListener na classe
- Se tiver alguma condição além do tipo do evento no "condition" do StreamListener, deve-se fazer o filtro via binding ou dentro do método handler.
- Alterar assinatura dos métodos conforme abaixo:
Antes:
@AllArgsConstructor @EnableBinding(COLENTChannel.class) public class ArquivoSubscriberOld { private final ArquivoService service; @StreamListener(target = COLENTChannel.INPUT_NAME, condition = ArquivoCriadoEvent.CONDITIONAL_EXPRESSION) public void arquivoCriado(TOTVSMessage<ArquivoCriadoEvent> message) { service.on(message.getContent()); } @StreamListener(target = COLENTChannel.INPUT_NAME, condition = ArquivoRemovidoEvent.CONDITIONAL_EXPRESSION) public void arquivoRemovido(TOTVSMessage<ArquivoRemovidoEvent> message) { service.on(message.getContent()); } }
Depois:
@Component @AllArgsConstructor @RabbitListener(queues = "coleta-entrega-header-exchange.sl-coleta-entrega.arquivo") public class ArquivoSubscriber { private final ArquivoService service; @RabbitHandler public void arquivoCriado(@Payload ArquivoCriadoEvent message) { service.on(message); } @RabbitHandler public void arquivoRemovido(@Payload ArquivoRemovidoEvent message) { service.on(message); } } // TESTES @DisplayName("Arquivo Header - Mensageria") public class ArquivoSubscriberIT extends AdapterConfigIT { @MockBean private ArquivoService service; @Captor private ArgumentCaptor<ArquivoCriadoEvent> createCaptor; @Captor private ArgumentCaptor<ArquivoRemovidoEvent> removeCaptor; @Test @DisplayName("Deve manipular mensagem recebido por mensageria") void deveManipularArquivoCriadoEvent() throws Exception { var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class); var listener = new MockListener(subscriber); var event = mockArquivoCriadoEvent(); var message = TOTVSMessageBuilder .withType(ArquivoCriadoEvent.NAME) .setContent(event) .buildAmqp(); listener.sendMessage(message); verify(subscriber).arquivoCriado(createCaptor.capture()); assertInstanceOf(ArquivoCriadoEvent.class, createCaptor.getValue()); } @Test @DisplayName("Deve manipular mensagem recebido por mensageria") void deveManipularArquivoRemovidoEvent() throws Exception { var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class); var listener = new MockListener(subscriber); var arquivo = ArquivoTestFactory.umArquivo(); var event = ArquivoRemovidoEvent.of(arquivo.getId()); var message = TOTVSMessageBuilder .withType(ArquivoRemovidoEvent.NAME) .setContent(event).buildAmqp(); listener.sendMessage(message); verify(subscriber).arquivoRemovido(removeCaptor.capture()); assertInstanceOf(ArquivoRemovidoEvent.class, removeCaptor.getValue()); } @Test @DisplayName("Deve criar um arquivo recebido por mensageria") void deveCriarUmArquivo() { var subscriber = new ArquivoHeaderSubscriber(service); var event = mockArquivoCriadoEvent(); subscriber.arquivoCriado(event); var expected = ArquivoCriadoEvent.builder() .id(event.getId()) .nome(event.getNome()) .tipoConteudo(event.getTipoConteudo()) .removido(event.getRemovido()) .dataCriacao(event.getDataCriacao()) .tamanho(event.getTamanho()) .prefixo(event.getPrefixo()) .build(); verify(service).on(expected); } @Test @DisplayName("Deve remover um arquivo recebido por mensageria") void deveRemoverUmArquivo() { var arquivo = ArquivoTestFactory.umArquivo(); var subscriber = new ArquivoHeaderSubscriber(service); var event = ArquivoRemovidoEvent.of(arquivo.getId()); subscriber.arquivoRemovido(event); var expected = ArquivoRemovidoEvent.of(event.getId()); verify(service).on(expected); } private ArquivoCriadoEvent mockArquivoCriadoEvent() { return ArquivoCriadoEvent.builder() .id(UUID.randomUUID().toString()) .nome("nome") .tipoConteudo("tipoConteudo") .removido(false) .dataCriacao(ZonedDateTime.now()) .tamanho(new BigDecimal(127)) .prefixo(TestUtils.tenantId + "/") .build(); } }
Novos subscribers para sistemas externos como RAC, TPD e outros devem conter a anotação @IgnoreMessageInterceptor
@RabbitListener(queues = "queue") @IgnoreMessageInterceptor // Isto fará o skip do interceptors de todos os metodos da classe public class AlgumRabbitListenerSubscriber { }
Importante
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!
!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!!
3 - Mudar versão do evento
No serviço que envia as mensagens:
Adicionar a anotação @VersionedEvent nos eventos
Exemplo:
@VersionedEvent @Data(staticConstructor = "of") public final class ArquivoRemovidoEvent implements DomainEvent { private final String id; }
Importante
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!
!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!!
4 - Remover subscriber antigo
Depois que todos os pods publicadores foram substituídos e todas as mensagens da fila original foram drenadas, pode ser removida a classe subscriber antiga juntamente com suas configurações antigas existentes no yaml.
5 - Repetir passos 3-4
Agora é só repetir os passos até fazer a troca de todos os subscribers.
6 - Desabilitar interceptors
Mudar o valor da configuração tjf.messaging.amqp.error.migration.interceptors.enable para false.
tjf: messaging: amqp: error: #----- configs existentes migration: interceptors: enabled: false #DESABILITA O USO DOS INTERCEPTORS exchanges: topicExchanges: #----- configs existentes headersExchanges: #----- configs existentes bindings: bindingsList: #----- configs existentes
Importante
Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!
!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR !!!
7 - Remover Versionamento dos eventos e utilização do SDK de migração
Depois que todos os consumidores das mensagens já foram migrados para @RabbitListener, remover a anotação @VersionedEvent, o código adicionado nos Publishers, a configuração referente ao sdk de migração do yaml (migration.interceptors.enabled) e finalmente a dependência do SDK.
Exemplo do publisher sem a utilização do sdk.
protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) { TOTVSMessageBuilder.withType(contentToBeDispatched.getClass().getSimpleName() .setContent(contentToBeDispatched) .setTransactionInfo(transactionInfo) .buildAmqp() .sendTo(rabbitTemplate, EXCHANGE, ROUTING_KEY); }