Go Lang com processamento assíncrono, utilizando Kafka
Sabemos que as soluções atualmente muitas vezes precisam utilizar processamento assíncrono, e existem várias ferramentas e linguagens para resolver isso.
Neste artigo, vou demonstrar de uma maneira muito simples como subiur um ambiente Kafka e usa-lo com a linguagem de programação Go.
Primeiramente, o que é Kafka?
Kafka é uma solução de streaming de eventos distribuída de forma open-source. Ele fornece uma plataforma unificada, de alta taxa de transferência e baixa latência para lidar com fluxos de dados em tempo real. Kafka é usado para construir pipelines de dados em tempo real e aplicativos, sendo conhecido por sua durabilidade, escalabilidade e tolerância a falhas. Ele permite o desacoplamento de fluxos de dados e possibilita que os aplicativos publiquem, assinem, armazenem e processem fluxos de registros em tempo real.
Agora que sabemos o que é Kafka, vamos preparar o ambiente para demonstrar o seu uso.
Para facilitar a visualização da produção e consumo de eventos em ação, vamos iniciar o serviço usando Docker Compose.
Crie uma pasta chamada kafka-docker
mkdir kafka-docker
cd kafka-docker
Dentro desta pasta, crie o arquivo docker-compose.yml abaixo
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.6.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.6.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.6.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.6.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.6.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.6.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
Para rodar a solução, utilize o comando abaixo
docker-compose up -d
Criando seu primeiro tópico
Agora que temos um ambiente Kafka em execução, vamos criar o seu primeiro tópico.
Para isso, acesse a URL: http://localhost:9021/
Neste paínel clique em: CONTROLCENTER.CLUSTER -> Topics -> Add topic
Utilizando Kafka em Go Lang
Agora que temos nosso ambiente Kafka configurado, chegamos à parte divertida: vamos produzir e consumir eventos no Kafka.
Produzindo eventos
Para produzir eventos em Kafka, utilizaremos a lib: confluent kafka da Confluent.
Para isso vamos começar nosso módulo Go, acesse a pasta que deseja deixar seu projeto e rode os comandos abaixo:
Inicie o projeto
go mod init github.com/seu_usuário/seurepo
Adicione a lib sarama
go get github.com/confluentinc/confluent-kafka-go/v2
Crie um arquivo chamado kafka.go e vamos implementar a produção e consumo dos eventos, para isso vamos criar a function main conforme abaixo:
package main
import (
"fmt"
"log"
"os"
)
const (
topic = "meu-topico"
groupConsumer = "meu-grupo"
broker = "localhost:9092"
)
func main() {
if len(os.Args) < 2 {
fmt.Println("Uso: go run main.go [producer|consumer]")
return
}
mode := os.Args[1]
switch mode {
case "producer":
runProducer()
case "consumer":
runConsumer()
default:
fmt.Println("Modo não reconhecido. Use 'producer' ou 'consumer'")
}
func runProducer() {
}
func runConsumer() {
}
}
Com esta implementação ao rodar o programa para testarmos, poderemos especificar qual funcionalidade queremos utilizar.
Vamos implementar a produção de eventos no trecho de código abaixo:
func runProducer() {
config := &kafka.ConfigMap{
"bootstrap.servers": broker,
}
producer, err := kafka.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
topicName := topic
for _, word := range []string{"Hello", "Kafka", "World"} {
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: []byte(word),
}
err := producer.Produce(message, nil)
if err != nil {
fmt.Printf("Failed to produce message: %s\n", err)
continue
}
e := <-producer.Events()
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to %v\n", m.TopicPartition)
}
}
// Flush and wait for all messages to be delivered
producer.Flush(15 * 1000)
}
Desta forma, se ocorrer como o esperado a mensagem de sucesso deve ser impressa no seu terminal ao executar o comando abaixo:
go run kafka.go producer
Vamos agora implementar o consumo dos eventos. Para simplificar este artigo, estamos criando um código considerando uma única partição (explicarei mais abaixo o que é uma partição).
func runConsumer() {
topics := []string{topic}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"broker.address.family": "v4",
"group.id": groupConsumer,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"enable.auto.offset.store": true,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to topics: %s\n", err)
os.Exit(1)
}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
Com esse código implementado, ao executar o comando abaixo é para imprimir o evento recebido.
go run kafka.go consumer
Informação importante
Offset
Lembre-se deste nome, pois ele é muito importante para o seu consumidor. No código criado acima, estamos passando o offset = earliest
, o que isso significa? Significa que o seu consumidor vai pegar todos os eventos já publicados no tópico que ainda estejam dentro do TTL de vida dos eventos no mesmo.
Caso deseje começar a ler apenas os novos, pode usar latest
, essa estratégia é útil para casos em que você precisa obter apenas os novos eventos a serem gerados, ignorando os publicados anteriormente à ativação do seu consumidor.
Se você não precisa garantir a ordem e deseja processar vários eventos simultaneamente, a próxima estratégia pode ser mais adequada.
Controlando o último offset consumido pelo seu consumer
Para utilizar esta estratégia, você vai ter de controlar o commit dos seus eventos no consumers groups
do Kafka
Ajustando a config
do seu consumer para algo assim:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"broker.address.family": "v4",
"group.id": groupConsumer,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"enable.auto.offset.store": false,
})
Analisando esta nova config, percebe-se que alteramos o enable.auto.commit
e enable.auto.offset.store
para que seja controlado pelo nosso consumer.
Além da alteração na config, necessitamos adicionar esse trecho no processamento do evento:
//Criar um TopicPartition com o offset específico
tp := kafka.TopicPartition{
Topic: e.TopicPartition.Topic,
Partition: e.TopicPartition.Partition,
Offset: e.TopicPartition.Offset + 1, // Offset para ser commitado (próxima mensagem a ser lida)
}
// Commit manual do offset específico
offsets, err := c.CommitOffsets([]kafka.TopicPartition{tp})
if err != nil {
fmt.Printf("Failed to commit offsets: %s\n", err)
} else {
fmt.Printf("Committed offsets: %v\n", offsets)
}
Neste trecho acima, especificamos o offset a ser commitado, permitindo que o consumidor inicie o consumo de eventos a partir do último evento consumido anteriormente.
Com esta abordagem, podemos ajustar a estratégia de processamento de eventos usando Go Routines e apenas commitar o offset do último evento se não ocorrer nenhum erro durante o processamento.
É importante destacar que estamos consumindo eventos de uma única partição. Se desejar aumentar o paralelismo no processamento, é necessário ajustar o código para lidar com cada partição separadamente.
Caso não esteja familiarizado, aqui está uma breve explicação sobre o que é uma partição no Kafka.
Partições
Uma partição em Kafka é uma subdivisão de um tópico, que permite a distribuição de dados e o paralelismo no processamento. Cada tópico pode ter várias partições, e cada partição é um log ordenado e imutável de eventos. As partições são distribuídas entre os brokers no cluster Kafka, permitindo escalabilidade e alta disponibilidade
Os eventos dentro de uma partição são atribuídos a offsets únicos, que identificam a posição do evento no log. Consumidores podem ler eventos de uma ou mais partições, e um grupo de consumidores pode se dividir entre as partições para processar os eventos em paralelo, aumentando a eficiência do processamento.
Código final do artigo:
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
const (
topic = "meu-topico"
groupConsumer = "meu-grupo"
broker = "localhost:9092"
)
func main() {
if len(os.Args) < 2 {
fmt.Println("Uso: go run main.go [producer|consumer]")
return
}
mode := os.Args[1]
switch mode {
case "producer":
runProducer()
case "consumer":
runConsumer()
default:
fmt.Println("Modo não reconhecido. Use 'producer' ou 'consumer'")
}
}
func runConsumer() {
topics := []string{topic}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"broker.address.family": "v4",
"group.id": groupConsumer,
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
"enable.auto.offset.store": false,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics(topics, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to topics: %s\n", err)
os.Exit(1)
}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
//Criar um TopicPartition com o offset específico
tp := kafka.TopicPartition{
Topic: e.TopicPartition.Topic,
Partition: e.TopicPartition.Partition,
Offset: e.TopicPartition.Offset + 1, // Offset para ser commitado (próxima mensagem a ser lida)
}
// Commit manual do offset específico
offsets, err := c.CommitOffsets([]kafka.TopicPartition{tp})
if err != nil {
fmt.Printf("Failed to commit offsets: %s\n", err)
} else {
fmt.Printf("Committed offsets: %v\n", offsets)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
func runProducer() {
config := &kafka.ConfigMap{
"bootstrap.servers": broker,
}
producer, err := kafka.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
topicName := topic
for _, word := range []string{"Hello", "Kafka", "World"} {
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: []byte(word),
}
err := producer.Produce(message, nil)
if err != nil {
fmt.Printf("Failed to produce message: %s\n", err)
continue
}
e := <-producer.Events()
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to %v\n", m.TopicPartition)
}
}
// Flush and wait for all messages to be delivered
producer.Flush(15 * 1000)
}
Espero que este artigo tenha sido útil.