1. 背景
需要在多个微服务内调用 Kafka 发送消息,看似简单的内容结果阻塞了一天左右,复盘了下主要问题:
- 包依赖:新、旧不同框架
import
了不同版本的代码包,没有处理好import
依赖问题,导致了CGO
的代码被多次定义 - 微服务框架 Log 丢失:不同框架的 Log 组件日志打印问题,导致协程内的日志无法正常打印,应该和框架自义定
handler.Ctx
有关,后续再单独小结 - 业务复现流程长:一开始测试环境构建**“发布抽奖-等待抽奖-用户抽奖-发送消息”的复现链路较长,后续通过接口重放能够缩短整体 Case 时间,直接处理“抽奖-发送消息”**失败的逻辑
- sarama 消费发送失败:不同 kafka 版本、kafka 服务 IP 错误,缺乏 topic
2. Docker启动服务实例
为了方便自测,可以自己用docker-compose
快速起一个 kafka 实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
hostname: zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
hostname: kafka_dev_node
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- /data/docker_volumes/kafka-data/single_date:/bitnami
depends_on:
- zookeeper
restart: on-failure
kafka-manager:
image: kafkamanager/kafka-manager
ports:
- "9000:9000"
environment:
- ZK_HOSTS=zookeeper:2181
- KAFKA_MANAGER_AUTH_ENABLED=true
- KAFKA_MANAGER_USERNAME=admin
- KAFKA_MANAGER_PASSWORD=clark
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
|
3. Code Block
3.1. 消息发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| // SyncSendQappMessageToKafka 发送qapp协议类型的消息给kafka
func SyncSendQappMessageToKafka(ctx context.Context, topic string, data []byte) error {
// get kafka cfg
brokerUrls, err := GetKafkaCfgFromApollo(`kafka_cfg.room_interact_label`)
if err != nil {
return errors.Wrap(err, "kafka cfg got err")
}
// kafka config 配置
cfg := sarama.NewConfig()
cfg.Producer.Retry.Max = 10
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Return.Successes = true
cfg.ClientID = "room_interact_client" // 指明客户端请求id
cfg.Version = sarama.V0_10_2_1 // 注意该值应该和kafka版本匹配
// new producer
topic := "classInteraction"
saramaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(data),
}
saramaProducer, err := sarama.NewSyncProducer(brokerUrls, cfg)
if err != nil {
return errors.Wrap(err, "get sarama sync producer got err")
}
defer saramaProducer.Close()
// 发送消息到kafka
partition, offset, err := saramaProducer.SendMessage(saramaMsg)
if err != nil {
return errors.Wrapf(err, "kafka send sync message got err, topic(%s)/partition(%d)/offset(%d)", topic, partition, offset)
}
log.InfoContextf(ctx, "send message success, topic(%s)/partition(%d)/offset(%d)", topic, partition, offset)
return nil
}
|
4. 使用 github.com/Shopify/sarama
遇到的问题
github.com/Shopify/sarama
4.1. kafka 版本因素影响
现象: kafka sarama new sync producer got err: kafka: client has run out of available brokers to talk to: EOF
,已确认
原因: v1.36 sarama
包支持版本控制,默认为V1_XX
版本,需要修正下 cfg.Version = sarama.V0_10_2_1
4.2. kafka 配置错误
现象: kafka sarama new sync producer got err: kafka: invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer
原因: 同步sarama.NewSyncProducer
依赖 cfg.Producer.Return.Successes = true
,需要配置成true
4.3. Kafka 配置问题 - topic 不存在
现象:kafka send sync message got err, topic(classInteraction)/partition(-1)/offset(-1): kafka server: Request was for a topic or partition that does not exist on this broker
原因: 因为验证 kafka 实例版本的调试过程中,更换了 kafka 服务实例的 IP,导致实例发送过程,缺失了topic
!通过开启日志可以帮助快速定位问题!
1
2
3
4
5
6
| // 开启调试日志(非常有用)
func init(){
// sarama log
logFile, _ := os.OpenFile("../log/sarama.log", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
sarama.Logger = stdlog.New(logFile, "[sarama] ", stdlog.LstdFlags)
}
|
4.3.1. 日志内容
5. sarama kafka console 调试工具
sarama
提供的 console工具,能快速进行消息的生产、消费,以及kafka服务端性能基本检测!
5.1. 安装sarama console
工具
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| // 编译安装到GOBIN目录或GOPATH/bin
git clone git@github.com:Shopify/sarama.git .
cd sarama
go install ./tools/...
// 包含生产、消费、性能测试
$ ll /private/data/go/bin/kafka-*
-rwxr-xr-x 1 lupingguo staff 8.0M Sep 2 00:14 /private/data/go/bin/kafka-console-consumer
-rwxr-xr-x 1 lupingguo staff 7.8M Sep 2 00:14 /private/data/go/bin/kafka-console-partitionconsumer
-rwxr-xr-x 1 lupingguo staff 8.0M Sep 2 00:14 /private/data/go/bin/kafka-console-producer
-rwxr-xr-x 1 lupingguo staff 8.2M Sep 2 00:14 /private/data/go/bin/kafka-producer-performance
// 消息生产
$ kafka-console-producer -brokers kafka_dev_node:9092 -topic room_interact -value '{"foo":"bar"}' -verbose
2022/09/02 00:19:24 Initializing new client
2022/09/02 00:19:24 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:19:24 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:19:24 client/metadata fetching metadata for all topics from broker kafka_dev_node:9092
2022/09/02 00:19:26 Connected to broker at kafka_dev_node:9092 (unregistered)
2022/09/02 00:19:26 client/brokers registered new broker #1 at kafka_dev_node:9092
2022/09/02 00:19:26 Successfully initialized new client
2022/09/02 00:19:26 client/metadata fetching metadata for [room_interact] from broker kafka_dev_node:9092
2022/09/02 00:19:26 client/metadata found some partitions to be leaderless
2022/09/02 00:19:26 client/metadata retrying after 250ms... (3 attempts remaining)
2022/09/02 00:19:26 client/metadata fetching metadata for [room_interact] from broker kafka_dev_node:9092
2022/09/02 00:19:26 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:19:26 producer/broker/1 starting up
2022/09/02 00:19:26 producer/broker/1 state change to [open] on room_interact/0
2022/09/02 00:19:26 Connected to broker at kafka_dev_node:9092 (registered as #1)
topic=room_interact partition=0 offset=0
2022/09/02 00:19:26 Producer shutting down.
2022/09/02 00:19:26 Closing Client
2022/09/02 00:19:26 Closed connection to broker kafka_dev_node:9092
2022/09/02 00:19:26 Closed connection to broker kafka_dev_node:9092
2022/09/02 00:19:26 producer/broker/1 input chan closed
2022/09/02 00:19:26 producer/broker/1 shut down
// 消费消费
$ kafka-console-consumer -brokers kafka_dev_node:9092 -topic room_interact -verbose
2022/09/02 00:38:12 Initializing new client
2022/09/02 00:38:12 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:38:12 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:38:12 client/metadata fetching metadata for all topics from broker kafka_dev_node:9092
2022/09/02 00:38:12 Connected to broker at kafka_dev_node:9092 (unregistered)
2022/09/02 00:38:12 client/brokers registered new broker #1 at kafka_dev_node:9092
2022/09/02 00:38:12 Successfully initialized new client
2022/09/02 00:38:12 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/09/02 00:38:12 Connected to broker at kafka_dev_node:9092 (registered as #1)
2022/09/02 00:38:12 consumer/broker/1 accumulated 1 new subscriptions
2022/09/02 00:38:12 consumer/broker/1 added subscription to room_interact/0
Partition: 0
Offset: 39083
Key:
Value: {"foo":"bar"}
// 性能测试
$ kafka-producer-performance -brokers kafka_dev_node:9092 -client-id kafka-cli -topic room_interact -message-load 1000 -message-size 3000 -verbose
2022/09/02 00:36:20 Initializing new client
2022/09/02 00:36:20 client/metadata fetching metadata for all topics from broker kafka_dev_node:9092
2022/09/02 00:36:20 Connected to broker at kafka_dev_node:9092 (unregistered)
2022/09/02 00:36:20 client/brokers registered new broker #1 at kafka_dev_node:9092
2022/09/02 00:36:20 Successfully initialized new client
2022/09/02 00:36:20 producer/broker/1 starting up
2022/09/02 00:36:20 producer/broker/1 state change to [open] on room_interact/0
2022/09/02 00:36:20 producer/broker/1 maximum request accumulated, waiting for space
2022/09/02 00:36:20 Connected to broker at kafka_dev_node:9092 (registered as #1)
2022/09/02 00:36:20 producer/broker/1 maximum request accumulated, waiting for space
2022/09/02 00:36:20 producer/broker/1 maximum request accumulated, waiting for space
2022/09/02 00:36:21 Producer shutting down.
2022/09/02 00:36:21 Closing Client
2022/09/02 00:36:21 producer/broker/1 input chan closed
2022/09/02 00:36:21 producer/broker/1 shut down
2022/09/02 00:36:21 Closed connection to broker kafka_dev_node:9092
2022/09/02 00:36:21 Closed connection to broker kafka_dev_node:9092
1000 records sent, 4335.2 records/sec (12.40 MiB/sec ingress, 8.32 MiB/sec egress), 210.7 ms avg latency, 86.7 ms stddev, 218.0 ms 50th, 294.5 ms 75th, 329.0 ms 95th, 329.0 ms 99th, 329.0 ms 99.9th, 0 total req. in flight
|
6. 小结
- 注意
github.com/Shopify/sarama
对于服务器的版本sarama.Config
配置匹配问题,避免出现EOF
问题; - 在遇到错误日志不是很明确的时候,可以借助sarama的调试日志加速定位问题;
- 可以借助
sarama console
工具简单验证服务端的Topick连通性问题