Go Kafka Sarama 包使用

AI 摘要: 该文章讨论了在使用Kafka发送消息时可能遇到的问题,包括包依赖、微服务框架Log丢失、业务复现流程长和sarama消费发送失败等。同时提供了Docker启动服务实例和一些小结。

1. 背景

需要在多个微服务内调用 Kafka 发送消息,看似简单的内容结果阻塞了一天左右,复盘了下主要问题:

  1. 包依赖:新、旧不同框架 import了不同版本的代码包,没有处理好import依赖问题,导致了CGO的代码被多次定义
  2. 微服务框架 Log 丢失:不同框架的 Log 组件日志打印问题,导致协程内的日志无法正常打印,应该和框架自义定handler.Ctx有关,后续再单独小结
  3. 业务复现流程长:一开始测试环境构建**“发布抽奖-等待抽奖-用户抽奖-发送消息”的复现链路较长,后续通过接口重放能够缩短整体 Case 时间,直接处理“抽奖-发送消息”**失败的逻辑
  4. sarama 消费发送失败:不同 kafka 版本、kafka 服务 IP 错误,缺乏 topic

2. Docker启动服务实例

为了方便自测,可以自己用docker-compose快速起一个 kafka 实例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    hostname: zookeeper
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    hostname: kafka_dev_node
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - /data/docker_volumes/kafka-data/single_date:/bitnami
    depends_on:
      - zookeeper
    restart: on-failure
  kafka-manager:
    image: kafkamanager/kafka-manager
    ports:
      - "9000:9000"
    environment:
      - ZK_HOSTS=zookeeper:2181
      - KAFKA_MANAGER_AUTH_ENABLED=true
      - KAFKA_MANAGER_USERNAME=admin
      - KAFKA_MANAGER_PASSWORD=clark
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

3. Code Block

3.1. 消息发送

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// SyncSendQappMessageToKafka 发送qapp协议类型的消息给kafka
func SyncSendQappMessageToKafka(ctx context.Context, topic string, data []byte) error {
	// get kafka cfg
	brokerUrls, err := GetKafkaCfgFromApollo(`kafka_cfg.room_interact_label`)
	if err != nil {
		return errors.Wrap(err, "kafka cfg got err")
	}

	// kafka config 配置
	cfg := sarama.NewConfig()
	cfg.Producer.Retry.Max = 10
	cfg.Producer.RequiredAcks = sarama.WaitForAll
	cfg.Producer.Return.Successes = true
	cfg.ClientID = "room_interact_client" // 指明客户端请求id
	cfg.Version = sarama.V0_10_2_1        // 注意该值应该和kafka版本匹配

	// new producer
	topic := "classInteraction"
	saramaMsg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(data),
	}
	saramaProducer, err := sarama.NewSyncProducer(brokerUrls, cfg)
	if err != nil {
		return errors.Wrap(err, "get sarama sync producer got err")
	}
	defer saramaProducer.Close()

	// 发送消息到kafka
	partition, offset, err := saramaProducer.SendMessage(saramaMsg)
	if err != nil {
		return errors.Wrapf(err, "kafka send sync message got err, topic(%s)/partition(%d)/offset(%d)", topic, partition, offset)
	}

	log.InfoContextf(ctx, "send message success, topic(%s)/partition(%d)/offset(%d)", topic, partition, offset)

	return nil
}

4. 使用 github.com/Shopify/sarama 遇到的问题

github.com/Shopify/sarama

4.1. kafka 版本因素影响

现象: kafka sarama new sync producer got err: kafka: client has run out of available brokers to talk to: EOF,已确认 原因v1.36 sarama包支持版本控制,默认为V1_XX版本,需要修正下 cfg.Version = sarama.V0_10_2_1

4.2. kafka 配置错误

现象: kafka sarama new sync producer got err: kafka: invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer

原因: 同步sarama.NewSyncProducer 依赖 cfg.Producer.Return.Successes = true,需要配置成true

4.3. Kafka 配置问题 - topic 不存在

现象kafka send sync message got err, topic(classInteraction)/partition(-1)/offset(-1): kafka server: Request was for a topic or partition that does not exist on this broker 原因: 因为验证 kafka 实例版本的调试过程中,更换了 kafka 服务实例的 IP,导致实例发送过程,缺失了topic !通过开启日志可以帮助快速定位问题!

1
2
3
4
5
6
// 开启调试日志(非常有用)
func init(){
	// sarama log
	logFile, _ := os.OpenFile("../log/sarama.log", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
	sarama.Logger = stdlog.New(logFile, "[sarama] ", stdlog.LstdFlags)
}

4.3.1. 日志内容

5. sarama kafka console 调试工具

sarama 提供的 console工具,能快速进行消息的生产、消费,以及kafka服务端性能基本检测!

5.1. 安装sarama console工具

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 编译安装到GOBIN目录或GOPATH/bin
git clone git@github.com:Shopify/sarama.git .
cd sarama
go install ./tools/...

// 包含生产、消费、性能测试
$ ll /private/data/go/bin/kafka-*
-rwxr-xr-x 1 lupingguo staff 8.0M Sep  2 00:14 /private/data/go/bin/kafka-console-consumer
-rwxr-xr-x 1 lupingguo staff 7.8M Sep  2 00:14 /private/data/go/bin/kafka-console-partitionconsumer
-rwxr-xr-x 1 lupingguo staff 8.0M Sep  2 00:14 /private/data/go/bin/kafka-console-producer
-rwxr-xr-x 1 lupingguo staff 8.2M Sep  2 00:14 /private/data/go/bin/kafka-producer-performance

// 消息生产
$ kafka-console-producer -brokers kafka_dev_node:9092 -topic room_interact -value '{"foo":"bar"}' -verbose
2022/09/02 00:19:24 Initializing new client
2022/09/02 00:19:24 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:19:24 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:19:24 client/metadata fetching metadata for all topics from broker kafka_dev_node:9092
2022/09/02 00:19:26 Connected to broker at kafka_dev_node:9092 (unregistered)
2022/09/02 00:19:26 client/brokers registered new broker #1 at kafka_dev_node:9092
2022/09/02 00:19:26 Successfully initialized new client
2022/09/02 00:19:26 client/metadata fetching metadata for [room_interact] from broker kafka_dev_node:9092
2022/09/02 00:19:26 client/metadata found some partitions to be leaderless
2022/09/02 00:19:26 client/metadata retrying after 250ms... (3 attempts remaining)
2022/09/02 00:19:26 client/metadata fetching metadata for [room_interact] from broker kafka_dev_node:9092
2022/09/02 00:19:26 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:19:26 producer/broker/1 starting up
2022/09/02 00:19:26 producer/broker/1 state change to [open] on room_interact/0
2022/09/02 00:19:26 Connected to broker at kafka_dev_node:9092 (registered as #1)
topic=room_interact	partition=0	offset=0
2022/09/02 00:19:26 Producer shutting down.
2022/09/02 00:19:26 Closing Client
2022/09/02 00:19:26 Closed connection to broker kafka_dev_node:9092
2022/09/02 00:19:26 Closed connection to broker kafka_dev_node:9092
2022/09/02 00:19:26 producer/broker/1 input chan closed
2022/09/02 00:19:26 producer/broker/1 shut down

// 消费消费
$ kafka-console-consumer -brokers kafka_dev_node:9092 -topic room_interact -verbose
2022/09/02 00:38:12 Initializing new client
2022/09/02 00:38:12 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:38:12 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:38:12 client/metadata fetching metadata for all topics from broker kafka_dev_node:9092
2022/09/02 00:38:12 Connected to broker at kafka_dev_node:9092 (unregistered)
2022/09/02 00:38:12 client/brokers registered new broker #1 at kafka_dev_node:9092
2022/09/02 00:38:12 Successfully initialized new client
2022/09/02 00:38:12 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:38:12 Connected to broker at kafka_dev_node:9092 (registered as #1)
2022/09/02 00:38:12 consumer/broker/1 accumulated 1 new subscriptions
2022/09/02 00:38:12 consumer/broker/1 added subscription to room_interact/0
Partition:	0
Offset:	39083
Key:
Value:	{"foo":"bar"}

// 性能测试
$ kafka-producer-performance -brokers kafka_dev_node:9092 -client-id kafka-cli -topic room_interact -message-load 1000 -message-size 3000 -verbose
2022/09/02 00:36:20 Initializing new client
2022/09/02 00:36:20 client/metadata fetching metadata for all topics from broker kafka_dev_node:9092
2022/09/02 00:36:20 Connected to broker at kafka_dev_node:9092 (unregistered)
2022/09/02 00:36:20 client/brokers registered new broker #1 at kafka_dev_node:9092
2022/09/02 00:36:20 Successfully initialized new client
2022/09/02 00:36:20 producer/broker/1 starting up
2022/09/02 00:36:20 producer/broker/1 state change to [open] on room_interact/0
2022/09/02 00:36:20 producer/broker/1 maximum request accumulated, waiting for space
2022/09/02 00:36:20 Connected to broker at kafka_dev_node:9092 (registered as #1)
2022/09/02 00:36:20 producer/broker/1 maximum request accumulated, waiting for space
2022/09/02 00:36:20 producer/broker/1 maximum request accumulated, waiting for space
2022/09/02 00:36:21 Producer shutting down.
2022/09/02 00:36:21 Closing Client
2022/09/02 00:36:21 producer/broker/1 input chan closed
2022/09/02 00:36:21 producer/broker/1 shut down
2022/09/02 00:36:21 Closed connection to broker kafka_dev_node:9092
2022/09/02 00:36:21 Closed connection to broker kafka_dev_node:9092
1000 records sent, 4335.2 records/sec (12.40 MiB/sec ingress, 8.32 MiB/sec egress), 210.7 ms avg latency, 86.7 ms stddev, 218.0 ms 50th, 294.5 ms 75th, 329.0 ms 95th, 329.0 ms 99th, 329.0 ms 99.9th, 0 total req. in flight

6. 小结

  1. 注意github.com/Shopify/sarama对于服务器的版本sarama.Config配置匹配问题,避免出现EOF问题;
  2. 在遇到错误日志不是很明确的时候,可以借助sarama的调试日志加速定位问题;
  3. 可以借助sarama console工具简单验证服务端的Topick连通性问题