kafka-go地址: https://github.com/segmentio/kafka-go

注意:本文解决的问题不是因为kafka配置造成的,是因为代码写得有问题。

今天查看公司某kafka group消费情况的时候,发现该group消费的topic大多数的message写入了同一个partition,先从配置问题查找,后来又去看公司包装的kafka的库的源码查找,看起来都没啥问题。

并且,该问题就出现在一个服务里面,很是让人迷惑。该服务和其他服务最大的不同在于,它每一次都只会发送一个message给kafka。

先看看,kafka-go的demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11

w := kafka.NewWriter(kafka.WriterConfig{

    Brokers: []string{"localhost:9092"},

    Topic:   "topic-A",

    Balancer: &kafka.LeastBytes{},

})

第一个看起来没啥问题,我发现公司代码也是照抄的,继续往NewWriter这个函数里面看,会发现这东西居然是起了一个协程,然后再去查看LeastBytes这个Balancer,他的Balance方法是依赖结构体中字段的状态的,每一次发送的时候都去创建一个新的Balancer,你不翻车谁翻车啊。

因为代码是每次都发送一个message,每次都是一个新的Balancer,所以Balancer每次只需Balance的状态都是一致,所以会造成只写入一个partition中。

总结一下,这个问题可大可小,数据量不大,程序抗得住当然没什么,但是总有一天会扛不住。所以奉劝一句,写完代码记得认认真真测试,程序跑得起来不代表正确。