1. 背景问题
基于 sarama 包使用消费者组做 kafka 消费时候,如果有 2 个消费者组(GroupA、GroupB)同时消费同一个 Topic(SSEMsgTopic),这个 Topic 有 5 个分区。
如果 GroupA 消费了 Topic 中的消息后(基于 ConsumeClaim 方法),将消息标记为已消费(使用 MarkMessage 方法),那么 GroupB 是否还是可以消费到 GroupA 标记已消费的消息?
不同消费者组消费的消息是否都是完全隔离的?
1.1. 图示 Kakfa 核心概念
- Topic -> Partitions: 一个 Topic 由一个或多个分区组成,消息被写入到这些分区中。
- Partitions -> Brokers: 分区分布在 Kafka 集群的 Broker 上,每个 Broker 存储一部分分区的数据。
- Producer -> Topic: 生产者将消息发送到特定的 Topic。
- Consumer Group -> Topic: 消费者组订阅一个或多个 Topic。
- Consumer Group -> Partitions: 消费者组内的消费者实例被分配到 Topic 的分区,每个分区在组内只被一个消费者消费。
- Consumer Group -> Offset: 每个消费者组独立地维护其在每个分区上的消费偏移量,以跟踪自己的消费进度。
- Isolation: 不同消费者组之间是完全隔离的,它们可以独立地消费同一个 Topic 的所有消息,互不影响。

1.2. Kafka 消费者组的工作原理
- Offset(偏移量):Kafka 为每个消费者组在每个分区上维护一个独立的偏移量。这个偏移量记录了该消费者组在该分区上已经消费到的最新消息的位置。
- 提交偏移量:当消费者组中的消费者成功处理一条消息后,它会向 Kafka 提交(commit)该消息的偏移量。这个提交是针对当前消费者组的。
- 独立性:
- 每个消费者组有自己的偏移量:GroupA 有它自己的
SSEMsgTopic
分区 0 的偏移量,GroupB 也有它自己的 SSEMsgTopic
分区 0 的偏移量。这两个偏移量是完全独立的。 - 消费进度互不影响:GroupA 提交了分区 0 的偏移量,表示 GroupA 已经消费到这里了,下次 GroupA 从这个位置继续消费。但这并不会影响 GroupB 的偏移量。GroupB 会从它自己记录的偏移量位置开始消费。
- 消息的持久性:消息一旦写入 Kafka,就会根据其配置的保留策略(retention policy)在一段时间内保持不变。消费者组只是读取这些消息,并不会删除它们。
1.3. 示例说明
假设 SSEMsgTopic
的分区 0 有消息 M1, M2, M3, M4, M5。
- GroupA 消费:
- GroupA 的消费者从分区 0 的起始位置开始消费。
- 它消费了 M1, M2, M3。
- 在
ConsumeClaim
方法中,GroupA 调用 MarkMessage(M3)
。 - GroupA 提交了偏移量,表示它已经消费到 M3。下次 GroupA 会从 M4 开始消费。
- GroupB 消费:
- 同时或稍后,GroupB 的消费者也从分区 0 的起始位置开始消费(或者从它自己上次提交的偏移量开始)。
- GroupB 完全不知道 GroupA 已经消费了 M1, M2, M3。
- GroupB 会从它自己的偏移量位置开始,独立地消费 M1, M2, M3, M4, M5。
- GroupB 消费了 M1, M2 后,调用
MarkMessage(M2)
,并提交偏移量。下次 GroupB 会从 M3 开始消费。
1.4. 小结
- 隔离性:不同消费者组之间是完全隔离的。一个消费者组的消费进度(偏移量)不会影响另一个消费者组的消费进度。
- 消息的重复消费(对于不同组):这是 Kafka 的一个强大特性,允许不同的应用或服务独立地处理同一份数据流,而互不干扰。例如,一个组可能用于实时分析,另一个组用于数据归档。
MarkMessage
的作用:MarkMessage
只是告诉 sarama
库,这条消息已经被处理了,可以安全地在下次提交偏移量时将该消息的偏移量作为新的提交点。这个操作是针对当前消费者组的。
2. Go Sarama 消费者组处理程序接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| // --------------------------------------------------------------------
// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
// It also provides hooks for your consumer group session life-cycle and allow you to
// trigger logic before or after the consume loop(s).
//
// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
// ensure that all state is safely protected against race conditions.
type ConsumerGroupHandler interface {
// Setup is run at the beginning of a new session, before ConsumeClaim.
Setup(ConsumerGroupSession) error
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
Cleanup(ConsumerGroupSession) error
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
type ConsumerGroupClaim interface {
// Topic returns the consumed topic name.
Topic() string
// Partition returns the consumed partition.
Partition() int32
// InitialOffset returns the initial offset that was used as a starting point for this claim.
InitialOffset() int64
// HighWaterMarkOffset returns the high watermark offset of the partition,
// i.e. the offset that will be used for the next message that will be produced.
// You can use this to determine how far behind the processing is.
HighWaterMarkOffset() int64
// Messages returns the read channel for the messages that are returned by
// the broker. The messages channel will be closed when a new rebalance cycle
// is due. You must finish processing and mark offsets within
// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
// re-assigned to another group member.
Messages() <-chan *ConsumerMessage
}
|
2.1. 代码说明

2.2. 补充一个 IBM/kafka 包极简使用教程
1
| go get -u github.com/sapaude/go-shims/infra/kafkax
|
- 解析 kafka 配置
- 初始化消费者组 app 应用
- 初始化消费者处理方法 handler,实现
sarama.ConsumerGroupHandler
接口
完整代码
https://github.com/sapaude/go-shims/blob/main/infra/kafkax/example/consumer_example.go