在传统同步调用模式中,服务 A 调用服务 B 必须等待 B 返回结果,这带来了三个问题:耦合性强(A 依赖 B)、扩展困难(高流量时 B 可能成为瓶颈)、阻塞等待(A 的线程长时间占用)。消息队列作为一个中间层,将消息的生产和消费解耦,彻底改变了交互模式。
削峰填谷: 当请求峰值到来时,消息队列可以暂存请求,让下游消费者按照自己的处理能力匀速消费,防止系统被突发流量冲垮。
解耦: 生产者和消费者不再需要直接交互,只需要约定消息格式。生产者可以随时增加,消费者也可以独立扩展。
异步处理: 生产者无需等待消费者处理完成,可以立即返回,提高系统的响应能力和吞吐量。
# 同步调用 (阻塞等待)
def sync_process(order):
# 调用支付服务,等待响应
result = pay_service(order)
if result.success:
send_email(order.user)
return True
# 异步消息队列 (非阻塞)
def async_process(order):
# 将订单消息放入队列,立即返回
queue.send( "order_created", order )
# 消费者从队列拉取消息,异步处理支付和邮件
return True
传统消息队列在吞吐量上存在瓶颈,难以支撑实时数据流(如点击流、日志收集)。Kafka 的设计目标从一开始就是高吞吐、低延迟。它通过分区 (Partition) 机制实现横向扩展,通过顺序读写和零拷贝技术实现高性能。
核心概念:
# 创建 Topic 并指定分区数
kafka-topics --create --topic orders --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
# 查看 Topic 分区信息
kafka-topics --describe --topic orders --bootstrap-server localhost:9092
# 生产者发送消息时指定分区
# 如果未指定分区,Kafka 使用 round-robin 或 key 哈希
producer.send( "orders", key=order_id, value=message )
# 消费者消费指定分区
consumer.assign( [TopicPartition("orders", 0)] )
分布式系统中,消息可能因为网络故障、Broker 崩溃、消费者宕机等原因丢失。要实现消息可靠性投递,需要在生产者、Broker、消费者三个环节都建立保障机制。Kafka 通过 ack 机制、副本同步、消费位移提交等机制,提供了从生产到消费的端到端可靠性保证。
生产者确认 (acks):
消费者确认 (Offset commit): 消费者处理完消息后提交 Offset,如果未提交,重启后可以重新消费。
# 生产者配置 (高可靠性)
props.put("acks", "all");
props.put("enable.idempotence", true);
props.put("retries", 10);
props.put("max.in.flight.requests.per.connection", 5);
# Broker 配置 (副本同步)
min.insync.replicas=2 # 至少 2 个副本同步
default.replication.factor=3 # 副本因子 3
# 消费者配置 (手动提交 Offset)
props.put("enable.auto.commit", "false");
# 处理完消息后手动提交
consumer.commitSync();
在消息队列中,消息的生产顺序和消费顺序可能不同。如果业务要求严格的顺序(如订单状态变更),则必须保证消息按序消费。Kafka 提供了分区级顺序保证:同一分区内的消息按写入顺序存储,消费者也按相同顺序消费。
分区顺序: Kafka 保证同一分区内的消息按发送顺序存储,消费者按该顺序消费。但不同分区之间的顺序无法保证。
全局顺序: 如果业务要求全局有序,所有消息必须发送到同一个分区,但这会限制吞吐量。
并行消费: 每个分区可以被一个消费者消费,多个分区可以被多个消费者并行消费,提高处理速度。
# 生产者: 将相关消息发送到同一个分区
void sendOrderEvents(String orderId) {
// 使用 orderId 作为 key,确保同一订单的消息进入同一分区
producer.send( "orders", orderId, "OrderCreated" );
producer.send( "orders", orderId, "OrderPaid" );
producer.send( "orders", orderId, "OrderShipped" );
}
# 消费者: 单线程消费一个分区,保证顺序
while (true) {
// 拉取一批消息
ConsumerRecords<String, String> records = consumer.poll( 100 );
for (ConsumerRecord record : records) {
// 顺序处理消息
processOrder(record);
}
// 处理完后提交 Offset
consumer.commitSync();
}
在分布式系统中,消息可能由于网络重试、消费者重平衡、Offset 提交失败等原因被重复消费。如果业务逻辑不是幂等的(如银行转账、库存扣减),重复消费会导致严重的数据错误。因此,幂等性和去重机制是消息队列应用中的关键设计。
生产者幂等: Kafka 的幂等生产者 (idempotent producer) 通过为每条消息分配 ProducerId 和 SequenceNumber,Broker 会自动去重,防止生产重试导致重复消息。
消费者去重: 消费者在处理消息前,检查消息 ID 是否已在去重表中存在。如果存在,则跳过处理;否则处理并记录。
# 消费者去重 (使用 Redis)
def consume_message(msg):
msg_id = msg.headers["msg_id"]
# 检查是否已经处理过
if redis.sismember("processed_msgs", msg_id):
print("Duplicate message, skipping")
return
# 处理消息
process_business_logic(msg)
# 将消息 ID 标记为已处理
redis.sadd("processed_msgs", msg_id)
# 设置过期时间,防止去重表无限增长
redis.expire("processed_msgs", 86400) # 24 小时后清理
当消息的生产速度持续大于消费速度,或者消费者出现故障时,消息会在队列中堆积,形成积压 (Backlog)。严重的积压会导致消息延迟增加、消费者内存压力增大,甚至磁盘空间耗尽。
消费者组 (Consumer Group): 消费者组允许多个消费者共同消费一个 Topic。Kafka 将分区平均分配给组内的消费者,每个分区只能被一个消费者消费。
分区重平衡 (Rebalance): 当消费者加入、退出或故障时,会触发重平衡,重新分配分区。
积压监控指标:
# 查看消费者组积压
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe
# 输出示例:
# PARTITION LAG TOPIC
# 0 1500 orders
# 1 3200 orders
# 增加消费者 (需保证分区数 >= 消费者数)
# 启动新的消费者实例,加入同一 consumer group
# 重置 Offset (丢弃积压消息)
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --topic orders --reset-offsets --to-latest --execute
# 自动扩容脚本示例
if lag > 5000:
increase_consumers(2)