Árvore de páginas

Esta é uma POC com um cenário parcial do modelo final.

Exemplo de uso da documentação abaixo no Chat GPT: https://chatgpt.com/share/0060da84-e5eb-48d8-928a-958b6e7a6669

Configuração atual para canal de entrada com Spring Cloud Stream

Configuração da aplicação spring

spring:
	cloud:
		stream:
			bindings:
				<<nomeCanal>>:
					destination: <<nomeExchange>>
					gorup: <<nomeFila>>
					consumer:
						concurrency: <<valorConcurrency>>					

Caso a exchange seja diferente de topic, o tipo da exchange é especificado em spring.cloud.stream.rabbit.bindings.<<nomeCanal>>.consumer.exchangeType conforme abaixo:

spring:
	cloud:
		stream:
			bindings:
				<<nomeCanal>>:
					destination: <<nomeExchange>>
					gorup: <<nomeFila>>
					consumer:
						concurrency: <<valorConcurrency>>		
			rabbit:
				bindings:
					<<nomeCanal>>:
						consumer:
							exchangeType: <<exchangeTypeValue>>

Sendo a exchange do tipo headers, o binding conforme os atributos do header da mensagem é especificado em spring.cloud.stream.rabbit.bindings.<<nomeCanal>>.consumer.queueBindingArguments conforme abaixo.

spring:
	cloud:
		stream:
			bindings:
				<<nomeCanal>>:
					destination: <<nomeExchange>>
					group: <<nomeFila>>
					consumer:
						concurrency: <<valorConcurrency>>		
			rabbit:
				bindings:
					<<nomeCanal>>:
						consumer:
							exchangeType: headers
							queueBindingArguments:
								<<headerKey>>:<<headerValue>>
 								<<headerKey>>:<<headerValue>>


Na aplicação spring, para ler a configuração acima há um código conforme abaixo:

public interface Channel {
	String INPUT= "<<nomeCanal>>";

	@Input(INPUT)
	SubscribableChannel input();
}

Este canal de input pode estar declarado junto a um canal de saída, na mesma interface, exemplo:

public interface Channel {
	String INPUT= "<<nomeCanalEntrada>>";
    String OUPUT= "<<nomeCanalSaida>>";   

	@Input(INPUT)
	SubscribableChannel input();

	@Output(OUPUT)
	MessageChannel output(); 
}

Ainda na aplicação spring, para habilitar a configuração na aplicação e permitir que um método seja executado ao receber uma nova mensagem no canal de entrada há um código conforme abaixo:

@EnableBinding(Channel.class)
public class Subscriber {

	@StreamListener(target = Channel.INPUT, condition = Mensagem.CONDITIONAL_EXPRESSION)
	public void methodName(TOTVSMessage<Mensagem> message) {		
	}
}


Abaixo, exemplos concretos de um canal de entrada que correspondem as estruturas de código acima mencionadas:

Exemplo concreto da configuração, neste caso não há configuração para concurrency pois utiliza-se o valor padrão 1, também não é especificado a configuração referente a exchangeType pois o valor é topic (valor padrão).

spring:
	cloud:
		stream:
			bindings:     
				coleta-entrega-input-events:
					destination: coleta-entrega-exchange
					gorup: ${spring.application.name}

Neste caso, o nome da fila é recebe o próprio nome da aplicação, exemplo ${spring.application.name}, contudo poderia ser qualquer outro nome de uma fila.


Exemplo concreto de uma aplicação spring para ler a configuração acima:

public interface COLENTChannel {
	String INPUT_NAME = "coleta-entrega-input-events";

	@Input(INPUT_NAME)
	SubscribableChannel input();
}


Exemplo concreto de uma aplicação spring que habilita a configuração e configura o método uploadArquivo quando recebe a mensagem UploadArquivoCmd no canal coleta-entrega-input-events. O canal é identificado por COLENTChannel.INPUT_NAME e a condição para recebimento da mensagem é identificado por UploadArquivoCmd.CONDITIONAL_EXPRESSION que é igual a "headers['type']=="UploadArquivoCmd"

@AllArgsConstructor
@EnableBinding(COLENTChannel.class)
public class ArquivoSubscriber {

	private final ArquivoApplicationService service;
	private final VerificarTenantValidoService verificarTenantValidoService;
	private final ArquivoRejeitadoApplicationService serviceRejeitada;

	@StreamListener(target = COLENTChannel.INPUT_NAME, condition = UploadArquivoCmd.CONDITIONAL_EXPRESSION)
	public void uploadArquivo(TOTVSMessage<UploadArquivoCmd> cmd) {
		if (verificarTenantValidoService.tenantValido(cmd)) {
			try {
				var cmd = ArquivoCmdAssembler.toCommand(message.getContent());
				service.handle(cmd);
            } 
            catch (Exception exception) {
				serviceRejeitada.handle(ArquivoCmdAssembler.toCommand(cmd, exception));
			}
		}
	}
}


Configuração atual de canal de saída com Spring Cloud Stream

Configuração da aplicação spring

spring:
	cloud:
		stream:
			bindings:     
				<<nomeCanal>>:
					destination: <<nomeExchange>>
			rabbit:     
				bindings:
				     <<nomeCanal>>:
						producer:
							transacted: true

Caso a exchange seja diferente de topic, o tipo é especificado em spring.cloud.stream.rabbit.bindings.<<nomeCanal>>.consumer.exchangeType conforme abaixo:

spring:
	cloud:
		stream:
			bindings:     
				<<nomeCanal>>:
					destination: <<nomeExchange>>
			rabbit:     
				bindings:
				     <<nomeCanal>>:
						producer:
							exchangeType: <<exchangeTypeValue>>
							transacted: true


Na aplicação spring, para ler a configuração acima há um código conforme abaixo:

public interface Channel {
	String OUTPUT = "<<nomeCanal>>";

	@Ouput(OUPUT) 
	MessageChannel ouput();
}

O canal de saída também pode estar declarado junto a um canal de entrada, na mesma interface, exemplo:

public interface Channel {
	String INPUT= "<<nomeCanalEntrada>>";
    String OUPUT= "<<nomeCanalSaida>>";   

	@Input(INPUT)
	SubscribableChannel input();

	@Output(OUPUT)
	MessageChannel output(); 
}


Ainda na aplicação spring, para habilitar a configuração na aplicação e permitir que uma mensagem seja postada em um canal de saída há um código conforme abaixo:

@AllArgsConstructor
@EnableBinding(Channel.class)
public class Publisher {
	private Channel channel;
    private TransactionContext transactionContext;
	
	public <T> void publish(T message) {
		var messageName = event.getClass().getSimpleName();
		
		var message = new TOTVSMessage<T>( messageName, transactionContext.getTransactionInfo());
		message.setContent(message);
		message.sendTo(channel.output());
	}   
}


Abaixo, exemplos concretos de um canal de saída que correspondem as estruturas de código acima mencionadas:

Exemplo concreto da configuração, neste caso não há configuração para exchangeType pois o valor é topic (valor padrão).

spring:
	cloud:
		stream:
			bindings:     
				coleta-entrega-ouput-events:
					destination: coleta-entrega-exchange
 			rabbit:     
				bindings:
         			coleta-entrega-ouput-events:
						producer:
							transacted: true


Exemplo concreto de uma aplicação spring que lê a configuração acima

public interface COLENTChannel {
	String OUPUT_NAME = "coleta-entrega-ouput-events";

	@OUTPUT(OUPUT_NAME)   
	MessageChannel ouput();
}


Exemplo concreto para habilitar a configuração na aplicação sprint e permitir que uma mensagem seja postada em um canal de saída:

@AllArgsConstructor
@EnableBinding(COLENTChannel.class)
public class Publisher {
	private COLENTPublisher channel;
    private TransactionContext transactionContext;
	
	public <T> void publish(T message) {
		var messageName = event.getClass().getSimpleName();
		
		var message = new TOTVSMessage<T>(messageName, transactionContext.getTransactionInfo());
		message.setContent(message);
		message.sendTo(channel.output());
	}   
}

Configuração atual para canal de tratamento erro com Spring Cloud Stream

cloud:
    stream:
      pollable-source: <<canalDeErro>>
      bindings:
        <<canalDeErro>>-in-0:
          destination: <<exchangeErro>>
          group: <<filaErro>>

         <<canalDeErro>>-out-0:
          destination: <<exchangeErro>>
          group: <<filaErro>>
          producer:
            requiredGroups: <<filaErro>>

Exemplo concreto de configuração de canal para tratamento de erro

cloud:
    stream:
      pollable-source: tjf-messaging-error
      bindings:
		tjf-messaging-error-in-0:
          destination: ${spring.application.name}-errors
          group: ${spring.application.name}-errors

        tjf-messaging-error-out-0:
          destination: ${spring.application.name}-errors
          group: ${spring.application.name}-errors
          producer:
            requiredGroups: ${spring.application.name}-errors

Migrando o canal de entrada para SpringAMQP com formato de configuração customizado com TJF.

Configuração da aplicação spring 

tjf:
	messaging:
		amqp:
			exchanges:
				topicExchanges:           
					- name: <<nomeExchange>>
			          durable: true
            		  autoDelete: false					
				queues:
			        queuesList:
			        	- name: <<nomeExchange>>.<<nomeFila>>
            			  durable: true
			              autoDelete: false
				bindings:
			        bindingsList:
						- destination: <<nomeExchange>>.<<nomeFila>>
			              exchange: <<nomeExchange>>
			              destinationType: queue

Para os casos com concurrency maior que 1, a configuração é realizada diretamente no código do método conforme será visto mais adiante.

Sendo a exchange do tipo headers, a declaração se dá através de headersExchange ao invés de topicExchanges e o binding conforme os atributos do header da mensagem é especificado em tjf.messaging.amqp.bindings.bindingsList.arguments conforme abaixo:

tjf:
	messaging:
		amqp:
			exchanges:
				headersExchanges:           
					- name: <<nomeExchange>>
			          durable: true
            		  autoDelete: false					
				queues:
			        queuesList:
			        	- name: <<nomeExchange>>.<<nomeFila>>
            			  durable: true
			              autoDelete: false
				bindings:
			        bindingsList:
						- destination: <<nomeExchange>>.<<nomeFila>>
			              exchange: <<nomeExchange>>
			              destinationType: queue
						  arguments:						 
 								<<headerKey>>:<<headerValue>>
 								<<headerKey>>:<<headerValue>>

Na aplicação spring, diferente de quando utilizado Spring Cloud Stream, não há mais necessidade da declaração dos canais em métodos anotados com @Input em uma interface que representa o canal.

Para permitir que um método seja executado ao receber uma nova mensagem no canal de entrada há um código conforme abaixo:

@AllArgsConstructor
@Component
@RabbitListener(queues = QUEUE)
public class Subscriber {

	private static final QUEUE = "nomeFila"   

	@RabbitHandler  
    public void methodName(@Payload final Mensagem msg) {
	}
}

Abaixo, exemplos concretos de um canal de entrada que correspondem as estruturas de código acima mencionadas:

Exemplo concreto da configuração quando o tipo da exchange é topic.

tjf:
	messaging:
		amqp:
			exchanges:
				topicExchanges:           
					- name: coleta-entrega-exchange
			          durable: true
            		  autoDelete: false					
				queues:
			        queuesList:
			        	- name: coleta-entrega-exchange.${spring.application.name}
            			  durable: true
			              autoDelete: false
				bindings:
			        bindingsList:
						- destination: coleta-entrega-exchange.${spring.application.name}
			              exchange: coleta-entrega-exchange
			              destinationType: queue     

Neste caso, o nome da fila é recebe o próprio nome da aplicação, exemplo ${spring.application.name}, contudo poderia ser qualquer outro nome de uma fila.


Diferente de quando é Spring Cloud Stream, não há necessidade de declarar uma interface com métodos anotados com @Input.


Exemplo concreto de uma aplicação spring que configura o método uploadArquivo para receber a mensagem UploadArquivoCmd da fila coleta-entrega-exchange.${spring.application.name} . 

@AllArgsConstructor 
@RabbitListener(queues = QUEUE) 
public class ArquivoSubscriber {
	private static final String QUEUE = "coleta-entrega-exchange.${spring.application.name}"

	private final ArquivoApplicationService service;
	private final VerificarTenantValidoService verificarTenantValidoService;
	private final ArquivoRejeitadoApplicationService serviceRejeitada;

	@RabbitHandler  
    public void uploadArquivo(@Payload UploadArquivoCmd cmd) {
		if (verificarTenantValidoService.tenantValido(cmd)) {
			try {
				var appCmd = ArquivoCmdAssembler.toCommand(cmd);
				service.handle(appCmd);
			} catch (Exception exception) {
				serviceRejeitada.handle(ArquivoCmdAssembler.toCommand(cmd, exception));
			}
		}
	}
}


Migrando o canal de saída para SpringAMQP com formato de configuração customizado com TJF.

Configuração da aplicação spring

tjf:
	messaging:
		amqp:
			exchanges:
				topicExchanges:           
					- name: <<nomeExchange>>
			          durable: true
            		  autoDelete: false	

Sendo a exchange do tipo headers, a declaração se dá através de headersExchange ao invés de topicExchanges. Caso já exista a mesma declaração por conta do canal de entrada, não há necessidade de repetir.

tjf:
	messaging:
		amqp:
			exchanges:
				headersExchanges:           
					- name: <<nomeExchange>>
			          durable: true
            		  autoDelete: false								


Ainda na aplicação spring, para habilitar a configuração na aplicação e permitir que um método seja executado ao receber uma nova mensagem no canal de entrada há um código conforme abaixo:

@AllArgsConstructor
@Component
public class Publisher {
    private static final String EXCHANGE = "nomeExchange"

    private TransactionContext transactionContext;
	private RabbitTemplate rabbitTemplate;

    public <T> void publish(T message) {
		var messageName = event.getClass().getSimpleName();

	 	message = TOTVSMessageBuilder.<T>
					.withDefaultType()
					.setContent(message)
					.setTransactionInfo(transactionInfo)
					.buildAmqp();      

		rabbitTemplate.convertAndSend(EXCHANGE, null,   message , null); 
}

Abaixo, exemplos concretos de um canal de saída que correspondem as estruturas de código acima mencionadas:

Exemplo concreto da configuração quando o tipo da exchange é topic.

tjf:
	messaging:
		amqp:
			exchanges:
				topicExchanges:           
					- name: coleta-entrega-exchange
			          durable: true
            		  autoDelete: false	


Diferente de quando é Spring Cloud Stream, não há necessidade de declarar uma interface com métodos anotados com @Ouput.


Exemplo concreto de uma aplicação spring com um publicador que publica uma mensagem em uma exchange. 

@AllArgsConstructor
@Component
public class Publisher {
    private static final String EXCHANGE = "coleta-entrega-exchange"

    private TransactionContext transactionContext;
	private RabbitTemplate rabbitTemplate;

    public <T> void publish(T message) {
		var messageName = event.getClass().getSimpleName();

	 	message = TOTVSMessageBuilder.<T>
					.withDefaultType()
					.setContent(message)
					.setTransactionInfo(transactionInfo)
					.buildAmqp();      

		rabbitTemplate.convertAndSend(EXCHANGE, null,   message , null); 
}


Migrando o canal de tratamento de erro para SpringAMQP

tjf:
  messaging:
    amqp:
      error:
        exchange: <<exchangeErro>>

      exchanges:
        topicExchanges: 
          - name: <<exchangeErro>>
            durable: true
            autoDelete: false
      queues:
        queuesList:
          - name: <<exchangeErro>>.<<filaErro>>
            durable: true
            autoDelete: false
            exclusive: false
        queue-dlq: <<filaErro>>

      bindings:
        bindingsList:
          - destination: <<filaErro>>
            exchange: <<exchangeErro>>
            destinationType: queue    

Exemplo concreto de uma configuração de exchange e fila para tratamento de erro utilizando Spring AMQP

tjf:
  messaging:    
    amqp:
      error:
        exchange: coleta-entrega-core-errors            
      exchanges:
        topicExchanges:
          - name: coleta-entrega-core-errors
            durable: true
            autoDelete: false
      queues:
        queuesList:
          - name: coleta-entrega-core-errors.coleta-entrega-core-errors
            durable: true
            autoDelete: false
            exclusive: false
        queue-dlq: coleta-entrega-core-errors.coleta-entrega-core-errors
      bindings:
        bindingsList:
          - destination: coleta-entrega-core-errors.coleta-entrega-core-errors
            exchange: coleta-entrega-core-errors
            destinationType: queue


----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- 

DE/PARA FINAL


DE:

spring:
	cloud:
		stream:
			bindings:
				<<nomeCanal>>:
					destination: <<nomeExchange>>
					gorup: <<nomeFila>>
					consumer:
						concurrency: <<valorConcurrency>>					

PARA:

tjf:
	messaging:
		amqp:
			exchanges:
				topicExchanges:           
					- name: <<nomeExchange>>
			          durable: true
            		  autoDelete: false					
				queues:
			        queuesList:
			        	- name: <<nomeExchange>>.<<nomeFila>>
            			  durable: true
			              autoDelete: false
				bindings:
			        bindingsList:
						- destination: <<nomeExchange>>.<<nomeFila>>
			              exchange: <<nomeExchange>>
			              destinationType: queue




DE:

spring:
	cloud:
		stream:
			bindings:
				<<nomeCanal>>:
					destination: <<nomeExchange>>
					group: <<nomeFila>>
					consumer:
						concurrency: <<valorConcurrency>>		
			rabbit:
				bindings:
					<<nomeCanal>>:
						consumer:
							exchangeType: headers
							queueBindingArguments:
								<<headerKey>>:<<headerValue>>
 								<<headerKey>>:<<headerValue>>

PARA:

tjf:
	messaging:
		amqp:
			exchanges:
				headersExchanges:           
					- name: <<nomeExchange>>
			          durable: true
            		  autoDelete: false					
				queues:
			        queuesList:
			        	- name: <<nomeExchange>>.<<nomeFila>>
            			  durable: true
			              autoDelete: false
				bindings:
			        bindingsList:
						- destination: <<nomeExchange>>.<<nomeFila>>
			              exchange: <<nomeExchange>>
			              destinationType: queue
						  arguments:						 
 								<<headerKey>>:<<headerValue>>
 								<<headerKey>>:<<headerValue>>




DE:

public interface Channel {
	String INPUT= "<<nomeCanal>>";

	@Input(INPUT)
	SubscribableChannel input();
}

PARA:

Remover, não se aplica mais...




DE:

@EnableBinding(Channel.class)
public class Subscriber {

	@StreamListener(target = Channel.INPUT, condition = Mensagem.CONDITIONAL_EXPRESSION)
	public void methodName(TOTVSMessage<Mensagem> message) {		
	}
}

PARA:

@AllArgsConstructor
@Component
@RabbitListener(queues = QUEUE)
public class Subscriber {

	private static final QUEUE = "nomeFila"   

	@RabbitHandler  
    public void methodName(@Payload final Mensagem msg) {
	}
}




DE: 

spring:
	cloud:
		stream:
			bindings:     
				coleta-entrega-ouput-events:
					destination: coleta-entrega-exchange
 			rabbit:     
				bindings:
         			coleta-entrega-ouput-events:
						producer:
							transacted: true

PARA:

tjf:
	messaging:
		amqp:
			exchanges:
				topicExchanges:           
					- name: coleta-entrega-exchange
			          durable: true
            		  autoDelete: false	




DE: 

public interface COLENTChannel {
	String OUPUT_NAME = "coleta-entrega-ouput-events";

	@OUTPUT(OUPUT_NAME)   
	MessageChannel ouput();
}

PARA:

Remover, não se aplica mais...




DE:

@AllArgsConstructor
@EnableBinding(Channel.class)
public class Publisher {
	private Channel channel;
    private TransactionContext transactionContext;
	
	public <T> void publish(T message) {
		var messageName = event.getClass().getSimpleName();
		
		var message = new TOTVSMessage<T>( messageName, transactionContext.getTransactionInfo());
		message.setContent(message);
		message.sendTo(channel.output());
	}   
}

PARA:

@AllArgsConstructor
@Component
public class Publisher {
    private static final String EXCHANGE = "nomeExchange"

    private TransactionContext transactionContext;
	private RabbitTemplate rabbitTemplate;

    public <T> void publish(T message) {
		var messageName = event.getClass().getSimpleName();

	 	message = TOTVSMessageBuilder.<T>
					.withDefaultType()
					.setContent(message)
					.setTransactionInfo(transactionInfo)
					.buildAmqp();      

		rabbitTemplate.convertAndSend(EXCHANGE, null,   message , null); 
}




DE:

cloud:
    stream:
      pollable-source: <<canalDeErro>>
      bindings:
        <<canalDeErro>>-in-0:
          destination: <<exchangeErro>>
          group: <<filaErro>>

         <<canalDeErro>>-out-0:
          destination: <<exchangeErro>>
          group: <<filaErro>>
          producer:
            requiredGroups: <<filaErro>>

PARA:

tjf:
  messaging:
    amqp:
      error:
        exchange: <<exchangeErro>>

      exchanges:
        topicExchanges: 
          - name: <<exchangeErro>>
            durable: true
            autoDelete: false
      queues:
        queuesList:
          - name: <<exchangeErro>>.<<filaErro>>
            durable: true
            autoDelete: false
            exclusive: false
        queue-dlq: <<filaErro>>

      bindings:
        bindingsList:
          - destination: <<filaErro>>
            exchange: <<exchangeErro>>
            destinationType: queue    





  • Sem rótulos