拿来就能用!去哪儿网消息中间件 QMQ 详解

Դ未知

ߣ老铁SEO

17

2019-08-11 11:42:51

作者 | 去哪儿网QMQ团队

责编 | 伍杏玲

2012 年,随着业务快速增长,公司内部开始了服务化进程,通过拆分单体服务加快开发部署效率,提高业务迭代速度。服务化情况下不同的服务分开部署,不同服务之间需要通过网络来通信。

当时公司内部选择使用 Dubbo 作为主要的 RPC 框架。但是 RPC 并不适用于所有通信场景,RPC 主要表达同步的直接调用关系,但是实际上还有异步通知类型的通信需求。

分布式消息队列(MQ)是典型的异步通知类型的通信实现,有很多好处:

  • 业务可以专注核心流程,通过消息解耦“通知类”业务,减少核心流程受影响的可能性。
  • 可以方便的实现最终一致性,提高服务可用性和吞吐能力。
  • 通过错峰和流控可以提高服务可靠性,降低下游系统对上游的影响力。

QMQ 就是 Qunar 内部实现的分布式消息队列。

实际上,在 Qunar 内部,QMQ 的服务端实现有新老两个完全不同的版本。目前开源的是最新的版本,本文也会专注于介绍新版本的使用、设计和实现。

最初引入 QMQ 主要用于支付、订单等场景,对服务的可靠性、一致性要求比较高,但是当时并没有满足要求的可靠开源实现,所以公司内部自己造了 QMQ 这个轮子。根据当时考虑支持的主要业务场景,初版 QMQ 利用业务库实现了消息发送的可靠性和一致性,服务端存储则选择了MySQL+Redis。后来也基于 MySQL 开发了延时消息队列。

随着业务的发展和 QMQ 在公司内部的推广,QMQ 支撑的业务种类越来越多,消息量越来越大。这种情况下,初版 QMQ 暴露了一些问题:

  • 吞吐能力不足,逐渐不能满足公司内部的需求,和使用越来越多的Kafka相比差距很大。
  • 堆积能力弱,不能有效的实现错峰。
  • 推模型在消息堆积时会增加消费者的压力。

这些问题迫使我们重新设计了 QMQ 的存储模型。此时直接基于本地顺序文件的 Kafka 和 RocketMQ 等已经成为业内广泛使用的方案,它们提供了非常好的写入和堆积能力,QMQ 重新设计时也参考了他们的存储模型。

但根据公司内部的实际使用场景,我们单独设计了消费管理的模型,这点会在后面详细说明。

基本概念

首先,我们介绍一些使用时需要知道的基本概念。

producer 表示消息生产者,consumer 表示消息消费者,broker 一般用来表示 QMQ 的服务端。

subject 表示消息的主题,可以理解为消息的分类,每条消息发送时都需要指定一个主题,消费者订阅消息时也需要指定需要订阅的主题。

consumer group 表示消费者所在的消费组。在 QMQ 中,所有的消费者都属于一个消费组,消费组内共享消费进度,不同消费组之间的消费进度相互独立。消费时一条消息一般情况下只会派发给消费组中的一个消费者。

比如一个消费组中有 10 个消费者,那么这 10 个消费者就会均分主题的消息,而不是每个消费者都消费全部消息。实际业务场景里,也有单个消费者需要消费一个主题全部消息的情况,这种消费方式在 QMQ 中称为广播消费,广播消费时每个消费者都属于一个单独的消费组。

发送消息

发送消息时首先需要创建一个producer,创建时需要提供两个参数,一个是标识当前项目的 App Code,另一个是QMQ metaserver的地址。

MessageProducerProvider producer = newMessageProducerProvider;

producer.setAppCode("your app");

producer.setMetaServer("http://<meta server address>/meta/address");

producer.init;

发送实时消息时先使用producer创建消息,然后将需要发送的数据放入消息并发送。

//每次都需要使用generateMessage生成一个新消息

Message message = producer.generateMessage("qmq_subject");

//QMQ提供的Message是key/value的形式

message.setProperty("key""value");

//发送消息

producer.sendMessage(message);

发送延时消息的过程和发送实时消息类似,只是需要指定延时时间。

Message message = producer.generateMessage("qmq_delay_subject");

message.setProperty("key""value");

//设置延时时间

message.setDelayTime(15, TimeUnit.MINUTES);

producer.sendMessage(message);

需要注意的是 sendMessage 实际是个异步操作,如果想要知道一条消息的发送结果,需要额外提供回调。

producer.sendMessage(message, newMessageSendStateListener {

@Override

publicvoidonSuccess(Message message){

//send success

}

@Override

publicvoidonFailed(Message message){

//send failed

}

});

更多说明请参考:

https://github.com/qunarcorp/qmq/blob/master/docs/cn/producer.md

消费消息

消息消息时比较推荐的使用方式是直接在 Spring 中启用 QMQ 消费者相关的注解。

首先是配置 Spring,引入 QMQ 相关的注解支持。

第一种是 Spring 的 XML 配置方式。

<?xml version="1.0" encoding="UTF-8"?>

<beansxmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:qmq="http://www.qunar.com/schema/qmq"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd

http://www.qunar.com/schema/qmq http://www.qunar.com/schema/qmq/qmq.xsd">

<qmq:consumerappCode="your app"metaServer="http://meta server/meta/address"/>

<context:annotation-config/>

<context:component-scanbase-package="qunar.tc.qmq.demo.consumer.*"/>

<!-- 处理消息时使用的线程池 -->

<beanid="qmqExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">

<propertyname="corePoolSize"value="2"/>

<propertyname="maxPoolSize"value="2"/>

<propertyname="queueCapacity"value="1000"/>

<propertyname="threadNamePrefix"value="qmq-process"/>

</bean>

</beans>

第二种是Spring Boot中的注解配置方式。

@Configuration

@EnableQmq(appCode="your app", metaServer="http://<meta server address>/meta/address")

publicclassConfig{}

之后就可以使用QmqConsumer注解来订阅消息。使用这种方式时,消息会自动被ACK。如果处理时抛出异常,那么这条消息就会被标记为消费失败,延后一段时间后这条消息会被重新派发。

@QmqConsumer(subject = "qmq_subject", consumerGroup = "group", executor = "executor bean name")

publicvoidonMessage(Message message){

//处理消息

String value= message.getStringProperty("key");

}

QmqConsumer注解可以支持最多派发一次、广播消费等模式。

使用consumeMostOnce参数可以开启最多派发一次。

@QmqConsumer(subject = "qmq_subject", consumerGroup = "group", consumeMostOnce = true, executor = "executor bean name")

publicvoidonMessage(Message message){

}

使用isBroadcast参数可以开启广播消费。

@QmqConsumer(subject = "qmq_subject", consumerGroup = "group", isBroadcast = true, executor = "executor bean name")

publicvoidonMessage(Message message){

}

更多说明请参考:

https://github.com/qunarcorp/qmq/blob/master/docs/cn/consumer.md

事务消息

在消息发送方面,QMQ借助数据库事务提供了一种强一致性的事务消息,这是QMQ的一个比较特别的功能。

有些使用场景下一致性是非常关键的需求,比如在很多交易场景中,不能出现业务操作成功但消息未发出或消息已发出但是业务操作失败的情况。举个具体的例子,支付服务使用消息通知出票服务,这时不能出现支付成功消息却没有发出,这会引起用户投诉,也不能出现支付未成功消息却发出导致成功出票,这会导致公司损失。简单来说就是发消息和业务需要有事务保证。公司内部这种强一致性的业务基本都会使用数据库的事务来实现,QMQ根据这种情况演化出了基于数据库事务的事务消息。

在同一个数据库实例上,我们可以在同一个事务中操作多个不同的表。利用这个特性可以透明的将业务操作和消息发送放在同一个事务中。首先在公司里所有MySQL实例里都创建一个message database,这个可以让DBA放到自动化流程中,不需要使用方主动参与。然后我们在producer中添加事务消息的支持,在事务中不直接发送消息,而是先借助事务保存消息到业务实例上,等事务提交之后再开始发送,发送成功再删除消息,事务回滚时就不必发送这条消息。事务提交后消息发送失败时,QMQ有一个watchdog服务会定时检查各个业务库的message database,重新发送其中发送失败的消息。

实例使用方式可以参考下面的代码。首先创建producer时需要指定一个TransactionProvider,目前在Spring中提供支持的是SpringTransactionProvider,创建时指定业务的数据库datasource即可。

<bean id= "transactionProvider"class"qunar.tc.qmq.producer.tx.spring.SpringTransactionProvider">

<constructor-arg name= "bizDataSource"ref"dataSource"/>

</bean>

<bean id= "messageProducer"class"qunar.tc.qmq.producer.MessageProducerProvider">

<property name= "appCode"value"your app"/>

<property name= "metaServer"value"http://<metaserver address>/meta/address"/>

<property name= "transactionProvider"ref"transactionProvider"/>

</bean>

利用Spring的Transactional注解引入事务支持。sendMessage只是将消息写入业务库,且写入操作和payDao.append在同一个事务中,同时成功或失败。

@Transactional

publicvoidpay(Order order){

PayTransaction t = buildPayTransaction(order);

payDao.append(t);

producer.sendMessage(buildMessage(t));

}

更多说明请参考:

https://github.com/qunarcorp/qmq/blob/master/docs/cn/transaction.md

整体架构

下面简单介绍下QMQ服务端的整体架构,整体架构图如下所示。

QMQ服务端主要由metaserver、broker、delay这三个核心组件组成。

metaserver主要是负责管理各种元数据。第一种元数据是主题的路由信息,producer和consumer需要通过metaserver获得主题对应的broker、delay的地址,这样才能正确发送、消费消息。第二种元数据是实时和延时集群的组成,且需要维护与broker和delay的心跳,及时下线宕机的服务。metaserver中的各项元数据是保存在数据库中的。

实时集群和延时集群都是由多个group组成的,每个group内有主从2台机器。每个主题都会分配至少2个group,这样才能保证可用性,即挂掉一个group也不会影响消息的发送和消费。

延迟消息首先会发送到延迟集群,等延迟时间到达,延迟集群会将消息发送到实时集群,消费者只从实时集群消费消息,不直接和延迟集群交互。

实时集群和延迟集群中的服务都会和metaserver保持心跳,一旦metaserver检测到心跳过期,就会将对应的group标记为下线状态。

实时队列

下面简单介绍一下QMQ实时队列的主要设计,主要从消费模型和存储设计两方面介绍。消费模型是QMQ根据公司业务场景单独设计的,和Kafka这种常见消息队列有所区别。消费模型也决定了存储设计上的一些取舍。

消费模型

之前在QMQ背景介绍中提到,重新设计存储模型时我们参考了Kafka和RocketMQ的存储设计,那QMQ和这些消息队列在设计上有何不同呢?最大的不同就在消费模型上。QMQ的消费模型也算是它的一个特色,也是我们在过去几年运维消息中间件期间觉得必须提供的和难以舍弃的,尤其是在业务场景下使用时。

Kafka和RocketMQ都是基于partition的存储模型,每个主题分为一个或多个partition,server收到消息后将其分发到某个partition上,consumer消费消息时是与partition关联的。比如,某个主题a分配了3个partition(p1, p2, p3),某个消费组内有3个消费者(c1, c2, c3)消费该主题,则会建立c1 - p1, c2 - p2, c3 - p3这样的消费对应关系。如下图所示。

那么如果我们的consumer个数比partition个数多呢?则有的consumer会是空闲的。

而如果partition个数比consumer个数多呢?则可能存在有的consumer消费的partition个数会比其他的consumer多的情况。

这就导致合理的分配策略最好是partition个数与consumer个数成倍数关系。以上都是基于partition的MQ所带来的负载均衡问题。

这种静态的绑定的关系还会导致consumer扩容缩容麻烦。使用Kafka或者RocketMQ这种基于partition的消息队列时,如果处理速度跟不上,光简单的增加consumer并不能马上提高处理能力,需要对应的增加partition个数,特别在Kafka里partition是一个比较重的资源,增加partition还需要考虑整个集群的处理能力。高峰期过了之后,如果想缩容consumer也比较麻烦,因为partition只能增加,不能减少。

跟扩容相关的另外一个问题是,已经堆积的消息是不能通过扩容consumer快速消费的。比如开始时分配了2个partition,由2个consumer消费,但是突然发送方大量发送消息(这个在日常运维中经常遇到),导致消息快速堆积,这时我们如何能快速扩容消费这些消息呢?这时增加partition和consumer都没有用,因为堆积的那2个partition只能由2个consumer来消费,这时你只能纵向扩展,而不能横向扩展,而我们都知道纵向扩展很多时候是不现实的。

基于这些考虑我们单独设计了QMQ的消费模型,并制定了对应的存储模型。我们的设计考虑的是消费和存储模型是完全解耦,consumer可以很容易的扩容缩容,从现在来看这个选择也是正确的。现在去哪儿网的系统架构基本上呈现为基于消息驱动的架构,在我们内部系统之间的交互大部分都是以消息这种异步的方式来进行。比如我们酒店的订单变更消息就有接近70个不同的消费组订阅(可以将消费组理解为不同的应用),整个交易流程都是靠消息来驱动,那么从上面对基于partition模型的描述来看,要在70个不同应用之间协调partition和consumer的均衡几乎是不可能的。

存储模型

上面我们已经描述了QMQ没有采用基于partition的存储模型,但是Kafka和RocketMQ的存储实现方式后是有很多地方是值得借鉴的:

  • 顺序append文件,提供很好的写入性能
  • 顺序消费文件,使用offset表示消费进度,成本极低
  • 将所有subject的消息合并在一起,减少partition数量,可以提供更多的subject(RocketMQ)

在演化QMQ的存储模型时,觉得这几点是非常重要的。那如何在不使用partition的情况下能得到这些特性呢?

我们通过添加一层拉取的log(pull log)来动态映射consumer与partition的逻辑关系,这样不仅解决了consumer的动态扩容缩容问题,还可以继续使用一个offset表示消费进度。

下图是QMQ的存储模型:

先解释一下上图中的数字的意义。上图中方框上方的数字,表示该方框在自己log中的偏移,而方框内的数字是该项的内容。比如message log方框上方的数字:3,6,9几表示这几条消息在message log中的偏移。而consume log中方框内的数字3,6,9,20正对应着message log的偏移,表示这几个位置上的消息都是subject1的消息,consume log方框上方的1,2,3,4表示这几个方框在consume log中的逻辑偏移。下面的pull log方框内的内容对应着consume log的逻辑偏移,而pull log方框外的数字表示pull log的逻辑偏移。

在实时broker存储模型中有三种重要的log:

  • message log,所有subject的消息进入该log,消息的主存储
  • consume log,存储的是单个主题在message log的索引信息
  • pull log,每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consumer log中的sequence,这样消费者就可以使用pull log上的sequence来表示消费进度。

延时队列

除了实时消息,QMQ还支持任意时间的延时消息,当时在开源版本的RocektMQ里提供了多种固定延迟level的延时消息支持,也就是可以发送几种固定延时时间的延时消息,比如延时10s, 30s…,但是基于我们现有的业务特征,这种不同延时level的延时消息并不能满足我们的需求,我们需要任意时间延时。在OTA场景中,客人经常是预订未来某个时刻的酒店或者机票,这个时间是不固定的,我们无法使用几个固定的延时level来实现这个场景。

我们的延时消息是使用两层hash wheel timer来实现的。第一层位于磁盘上,每个小时(默认一个小时,可配置)为一个刻度,每个刻度会生成一个数据日志文件,根据业务特征,我们觉得支持两年(默认两年,可配置)内任意时间延时就够了,那么最多会生成2 * 366 * 24 = 17568个文件。第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引 (偏移量,投递时间等) 从磁盘文件加载到内存中的 hash wheel timer上。

在延时消息里也存在三种 log:

  • message log,和实时消息里的message log类似,收到消息后append到该 log,append成功后立即返回。
  • schedule log,按照投递时间组织,每小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层hash wheel timer的第一层,位于磁盘上。该log包含完整消息内容,所以message log里回放了之前的都可以删除,可以大大的节约磁盘空间。
  • dispatch log,延时消息投递后写入,主要用于在应用重启后能确定哪些消息已经投递。

工程地址:https://github.com/qunarcorp/qmq

作者简介:去哪儿网QMQ团队隶属于去哪儿网基础研发部 - 基础架构部,主要负责开发和维护QMQ消息中间件,满足业务团队需求,为业务团队提供良好的消息使用体验,同时保证系统的平稳运行。

负责人王克礼:2015年加入去哪儿网,资深Java开发工程师,具备多年企业中间件的开发实践经验,完整参与了QMQ新版本的设计与实现,希望能够持续提升QMQ。

佭ϴý Ѷ Media8ý

在线客服

外链咨询

扫码加我微信

微信:juxia_com

返回顶部