Em vários cenários o uso de filas pode ser um grande facilitador, que nos permite trazer mais confiabilidade no nosso software assim como também é uma ferramenta versátil que nos permite resolver uma infinidade de problemas.
Porém, ao adicionar um novo componente a um software estamos sempre aumentando a complexidade dele, o que pode ser indesejado, visto que ao mesmo tempo que é uma grande ferramenta, é também um novo ponto de falha, um novo serviço para gerenciar, uma nova fatura no cartão de crédito, mais uma semana na curva de aprendizado de um novo programador, etc.
Pensando nisso hoje estaremos abordando sobre o Amazon SQS (Simple Queue Service) no Spring Boot, esse serviço vem como uma alternativa direta ao ponto para implementação de mensageria no seu sistema, reduzindo bastante o impacto dos pontos listados acima, visto que:
- Você não vai precisar gerenciar nenhum dos aspectos da infraestrutura da sua fila
- Configuração fácil de Autenticação, com AWS IAM
- Free-Tier de 1 milhão de requisições por mês, válidos mesmo após o período grátis da sua conta
- Suporte a features básicas de fila
Ótimo para evitar adicionar uma grande complexidade ao seu software, porém, em cenários em que você precisa de features mais avançadas, talvez valha a pena migrar para outro serviço de fila.
Todo o código desse post pode ser consultado no repositório acima
Configurando o ambiente
Bibliotecas
Para interagir com o SQS temos algumas opções de bibliotecas, aqui usaremos spring-cloud-aws-starter-sqs do pacote awspring.cloud. A seguir um exemplo de pom básico com algumas bibliotecas para montarmos algo com SQS
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>4.0.0</version>
<!-- Aqui teria relativePath -->
</parent>
<groupId>kaiquebt.dev</groupId>
<artifactId>javasqs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>javasqs</name>
<!-- Aqui teria description, url, licenses, developers e scm -->
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-h2console</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Biblioteca para utilizar o SQS -->
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Aqui teria maven-compiler-plugin e spring-boot-maven-plugin -->
</plugins>
</build>
</project>Application yml
spring:
cloud:
aws:
region:
static: us-east-1
credentials:
access-key: YOUR_ACCESS_KEY
secret-key: YOUR_SECRET_KEYNota: Você deve configurar sua conta no IAM e verificar se ela tem acesso ao recurso
Desenvolvendo uma feature simples
Agora que temos tudo configurado, vamos começar a trabalhar em uma feature simples para usarmos o SQS.
Filas podem ser usados em diversos cenários, para orquestrar efeitos colaterais, fazer uma comunicação segura entre sistemas, adiar o processamento de uma requisição, etc.
Para esse exemplo vamos fazer um sistema que recebe um pedido e isso causa alguns efeitos colaterais no sistema, como retirada de estoque, envio de email.
Nota: Neste commit nós criamos um CRUD básico para começarmos a trabalhar em cima.
Implementando um service para utilizar o SQS
package kaiquebt.dev.javasqs.service;
import java.util.HashMap;
import java.util.Map;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
@Service
@RequiredArgsConstructor
public class SqsService {
private final SqsTemplate sqsTemplate;
private final SqsAsyncClient sqsAsyncClient;
public void sendObjectMessage(String queueName, Object payload) {
try {
JsonMapper mapper = JsonMapper.builder()
.enable(SerializationFeature.INDENT_OUTPUT)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.build();
this.sendMessage(queueName, mapper.writeValueAsString(payload));
} catch (JsonProcessingException e) {
throw new RuntimeException("Erro ao converter objeto para JSON", e);
}
}
public void sendMessage(String queueName, String messageBody) {
sqsTemplate.send(queueName, messageBody);
}
public void sendMessageToUrl(String queueUrl, String messageBody) {
sqsTemplate.send(to -> to.queue(queueUrl).payload(messageBody));
}
public String getQueueUrl(String queueName) {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
.build();
return sqsAsyncClient.getQueueUrl(request).join().queueUrl();
}
public String createQueue(String queueName, boolean isFifo) {
Map<QueueAttributeName, String> attributes = new HashMap<>();
if (isFifo) {
attributes.put(QueueAttributeName.FIFO_QUEUE, "true");
attributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true");
if (!queueName.endsWith(".fifo")) {
throw new IllegalArgumentException("Nome da fila de tipo FIFO deve terminar com .fifo");
}
}
CreateQueueRequest request = CreateQueueRequest.builder()
.queueName(queueName)
.attributes(attributes)
.build();
return sqsAsyncClient.createQueue(request).join().queueUrl();
}
public String createQueueIfNotExists(String queueName, boolean isFifo) {
try {
return getQueueUrl(queueName);
} catch (Exception e) {
if (e instanceof QueueDoesNotExistException || e.getCause() instanceof QueueDoesNotExistException) {
System.out.println("Fila não existe, criando...");
return createQueue(queueName, isFifo);
}
return null;
}
}
}Com o service acima teremos mais facilidade em trabalhar com o SQS daqui pra frente.
Criando nossas filas
Nossa aplicação irá criar algumas filas para disparar eventos ao um pedido ser criado, os eventos serão:
- Loggar o pedido no terminal
- Enviar um email para o comprador
- Gerar a nota fiscal
Nota: Não iremos realmente implementar essas funcionalidades, pois isso saíria muito do escopo do post. Também é importante notar que alguns eventos dependem de outros para serem executados, por exemplo, só podemos enviar o email para o comprador se tivermos a nota fiscal gerada, então a mensagem de enviar email deve ser enviada apenas após terminarmos de gerar a nota fiscal (pois geralmente o email contém a nota fiscal).
No ciclo de vida da nossa aplicação, ao inicializarmos o spring-boot iremos criar as filas caso elas não existam (utilizando o método do nosso SqsService). Para isso iremos primeiro definir o nome das filas no nosso application.yml.
Nota: essa parte é somente para deixar mais flexível caso queiramos trocar o nomes das filas, mas isso poderia ser também feito de outras formas, puxando de um registro no banco, hardcoded, etc
sqs:
queues:
pedido_log: pedido_log.fifo
pedido_email: pedido_email.fifo
pedido_nota_fiscal: pedido_nota_fiscal.fifo Dessa forma, criando as filas ao iniciar o spring, garantimos que as filas que estamos consumindo sempre existem, evitando crashs caso as filas não tenho sido criadas manualmente, está prática também garante mais agilidade no deploy da aplicação, pois menos configuração manual das filas é necessária (visto que tudo é criado a nível de aplicação).
Nota: Criar a fila manualmente também não é necessáriamente ruim (e isso pode ser automatizado de outras formas, fora da aplicação), pois criando ela a nível de aplicação exige que nossa aplicação tenha permissão de criar filas, o que pode ser indesejado a depender dos seus critérios de segurança. É um caso de trade-off.
Essa classe é a que gerencia a criação de filas na aplicação
package kaiquebt.dev.javasqs.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import kaiquebt.dev.javasqs.service.SqsService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@RequiredArgsConstructor
public class SqsQueueInitializer {
private final SqsService sqsService;
@Value("${sqs.queues.pedido_log}")
private String pedidoLogQueue;
@Value("${sqs.queues.pedido_email}")
private String pedidoEmailQueue;
@Value("${sqs.queues.pedido_nota_fiscal}")
private String pedidoNotaFiscalQueue;
@EventListener(ApplicationReadyEvent.class)
public void initializeQueues() {
log.info("Inicializando filas SQS...");
createQueue(pedidoLogQueue);
createQueue(pedidoEmailQueue);
createQueue(pedidoNotaFiscalQueue);
log.info("Filas SQS inicializadas com sucesso!");
}
private void createQueue(String queueName) {
try {
String queueUrl = sqsService.createQueueIfNotExists(queueName, true);
log.info("Fila criada/verificada: {} -> {}", queueName, queueUrl);
} catch (Exception e) {
log.error("Erro ao criar fila: {}", queueName, e);
}
}
}Enviando mensagens nas filas
Nós não iremos utilizar diretamente o SqsService, pois ele é extremamente genérico e faria nossa aplicação não ter segurança na tipagem, o que é uma das vantagens do java.
Criando o service que gerencia o envio dos eventos.
package kaiquebt.dev.javasqs.service;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import kaiquebt.dev.javasqs.model.Pedido;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class PedidoQueueProducer {
private final SqsService sqsService;
@Value("${sqs.queues.pedido_log}")
private String pedidoLogQueue;
@Value("${sqs.queues.pedido_email}")
private String pedidoEmailQueue;
@Value("${sqs.queues.pedido_nota_fiscal}")
private String pedidoNotaFiscalQueue;
@Data
@NoArgsConstructor
public static class CommonPedidoMessage {
public CommonPedidoMessage(UUID pedidoId) {
if (pedidoId == null) {
throw new IllegalArgumentException("Id do pedido não pode ser nulo na construção de uma mensagem");
}
this.pedidoId = pedidoId;
}
private UUID pedidoId;
}
public void producePedidoLog(Pedido pedido) {
sqsService.sendObjectMessage(pedidoLogQueue, new CommonPedidoMessage(pedido.getId()));
}
public void producePedidoEmail(Pedido pedido) {
sqsService.sendObjectMessage(pedidoEmailQueue, new CommonPedidoMessage(pedido.getId()));
}
public void producePedidoNotaFiscal(Pedido pedido) {
sqsService.sendObjectMessage(pedidoNotaFiscalQueue, new CommonPedidoMessage(pedido.getId()));
}
}Agora vamos usar esse método no service em que cria um pedido.
Nota: Coloquei o envio das mensagens após a transaction para evitar race conditions, mas isso geralmente não deve ser um problema, então não é completamente necessário, seria necessário fazer alguns testes pra ver se essa race condition pode existir mesmo como problema.
public PedidoDTO criar(PedidoRequestDTO dto) {
Pedido pedidoSalvo = createOnDatabase(dto);
pedidoQueueProducer.producePedidoLog(pedidoSalvo);
pedidoQueueProducer.producePedidoNotaFiscal(pedidoSalvo);
return pedidoMapper.toDTO(pedidoSalvo);
}
@Transactional
private Pedido createOnDatabase(PedidoRequestDTO dto) {
Pedido pedido = new Pedido();
pedido.setNomeCliente(dto.getNomeCliente());
pedido.setDataPedido(LocalDateTime.now());
pedido.setStatus(StatusPedido.PENDENTE);
List<ItemPedido> itens = dto.getItens().stream()
.map(itemDto -> criarItemPedido(itemDto, pedido))
.collect(Collectors.toList());
pedido.setItens(itens);
BigDecimal valorTotal = itens.stream()
.map(ItemPedido::getSubtotal)
.reduce(BigDecimal.ZERO, BigDecimal::add);
pedido.setValorTotal(valorTotal);
for (ItemPedido item : itens) {
estoqueService.adicionarQuantidade(item.getProduto().getId(), -(item.getQuantidade() != null ? item.getQuantidade(): 0));
}
Pedido pedidoSalvo = pedidoRepository.save(pedido);
return pedidoSalvo;
}Consumindo mensagens
Para consumir as mensagens iremos criar um PedidoConsumer.
package kaiquebt.dev.javasqs.consumer;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.awspring.cloud.sqs.annotation.SqsListener;
import kaiquebt.dev.javasqs.model.Pedido;
import kaiquebt.dev.javasqs.repository.PedidoRepository;
import kaiquebt.dev.javasqs.service.PedidoQueueProducer;
import kaiquebt.dev.javasqs.service.PedidoQueueProducer.CommonPedidoMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@RequiredArgsConstructor
public class PedidoConsumer {
private final PedidoQueueProducer pedidoQueueProducer;
private final PedidoRepository pedidoRepository;
@Value("${sqs.queues.pedido_log}")
private String pedidoLogQueue;
@Value("${sqs.queues.pedido_email}")
private String pedidoEmailQueue;
@Value("${sqs.queues.pedido_nota_fiscal}")
private String pedidoNotaFiscalQueue;
private CommonPedidoMessage read(String raw) {
JsonMapper mapper = JsonMapper.builder()
.enable(SerializationFeature.INDENT_OUTPUT)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.build();
try {
return mapper.readValue(raw, CommonPedidoMessage.class);
} catch (JsonProcessingException e) {
return null;
}
}
@SqsListener(queueNames = "${sqs.queues.pedido_log}")
@Transactional
public void consumePedidoLog(String message) {
log.info("Recebendo mensagem da fila {}: {}", pedidoLogQueue, message);
CommonPedidoMessage pedidoMessage = this.read(message);
UUID pedidoId = pedidoMessage.getPedidoId();
Pedido pedido = pedidoRepository.findById(pedidoId)
.orElseThrow(() -> new RuntimeException("Pedido não encontrado: " + pedidoId));
log.info("LOG - Pedido processado: ID={}, Cliente={}, Status={}, Valor={}",
pedido.getId(), pedido.getNomeCliente(), pedido.getStatus(), pedido.getValorTotal());
}
@SqsListener(queueNames = "${sqs.queues.pedido_nota_fiscal}")
@Transactional
public void consumePedidoNotaFiscal(String message) {
try {
log.info("Recebendo mensagem da fila {}: {}", pedidoNotaFiscalQueue, message);
CommonPedidoMessage pedidoMessage = this.read(message);
UUID pedidoId = pedidoMessage.getPedidoId();
Pedido pedido = pedidoRepository.findById(pedidoId)
.orElseThrow(() -> new RuntimeException("Pedido não encontrado: " + pedidoId));
log.info("NOTA FISCAL - Gerando nota fiscal para pedido ID: {}", pedido.getId());
log.info("NOTA FISCAL - Cliente: {}", pedido.getNomeCliente());
log.info("NOTA FISCAL - Data: {}", pedido.getDataPedido());
log.info("NOTA FISCAL - Valor Total: {}", pedido.getValorTotal());
log.info("NOTA FISCAL - Número de itens: {}", pedido.getItens().size());
pedidoQueueProducer.producePedidoEmail(pedido);
} catch (Exception e) {
log.error("Erro ao processar mensagem da fila pedido_nota_fiscal", e);
}
}
@SqsListener(queueNames = "${sqs.queues.pedido_email}")
@Transactional
public void consumePedidoEmail(String message) {
try {
log.info("Recebendo mensagem da fila {}: {}", pedidoEmailQueue, message);
CommonPedidoMessage pedidoMessage = this.read(message);
UUID pedidoId = pedidoMessage.getPedidoId();
Pedido pedido = pedidoRepository.findById(pedidoId)
.orElseThrow(() -> new RuntimeException("Pedido não encontrado: " + pedidoId));
log.info("EMAIL - Enviando email para cliente: {} sobre pedido ID: {}",
pedido.getNomeCliente(), pedido.getId());
log.info("EMAIL - Assunto: Confirmação do Pedido #{}", pedido.getId());
log.info("EMAIL - Valor Total: {}", pedido.getValorTotal());
} catch (Exception e) {
log.error("Erro ao processar mensagem da fila pedido_email", e);
}
}
}Encerramos aqui um exemplo simples de uso do Amazon SQS com Spring Boot. Vimos como filas podem ajudar a desacoplar processos e tornar a aplicação mais confiável, sem adicionar muita complexidade. O código completo está disponível no repositório.