ActiveMQ 中消息并行存储转发

  categories:mq, 资料  author:

英文文章地址:http://fusesource.com/docs/broker/5.4/persistence/KahaDB-Concurrent.html

Concurrent store and dispatch is a strategy that facilitates high rates of message throughput, provided the consumers are able to keep up with the flow of messages from the broker. By allowing the storing of messages to proceed concurrently with the dispatch of those messages to consumers, it can happen that the consumers return acknowledgments before the messages are ever written to disk. In this case, the message writes can be optimized away, because the dispatch has already completed.

并行存储转发是一种高吞吐量策略,可以让消息消费者跟上消息流的速度。在并行存储和转发的过程中,可能会在消息未持久化到硬盘时,接收到了消息消费的确认 信息,所以在这种情况下就不需要再持久化到硬盘了。值得注意得是,如果使用了jms事务,就不能使用并行储存转发。因为并行存储和转发是不保证消息的一致 性的。

Queue默认是使用并行存储转发的。当然也是可配置的。主要可配置项如下:
concurrentStoreAndDispatchQueues
concurrentStoreAndDispatchTopics
concurrentStoreAndDispatchTransactions

下面解析快慢两种消费者的情况下,并行存储转发的过程;

慢消费者:

1、producer发送一个消息M到destination
2、broker发送消息M到持久层。持久层中是一些负责写入消息到日志中的线程。
3、同时存储和转发。消息可能被转发给一个或者多个消费者。因为消费者较慢,在收到消息消费的确认信息之前,消息将被持久化到日志文件中。
4、收到消息消费的确认信息。
5、broker通知持久层删除日志文件中的消息。
(KahaDB的可回滚日志中,所以消息不会被直接删除,而是会记录一条信息到日子,表示这条消息已消费完成。等到一个日志文件中所有的消息都被消费完成后,才会删除或者归档这个日志文件)

快消费者:

1、producer发送一个消息M到destination
2、broker发送消息M到持久层。持久层中是一些负责写入消息到日志中的线程。
3、同时进行存储和转发。
4、消费者快速返回了消息消费完成的确认信息。
5、但受到所有消费者返回的确认信息后,便会通知持久层删除该消息。此时前面准备去持久化的消息还被阻塞着,这个时候便不再需要写入到持久层中。直接从内存中删除。

如果你要使KahaDB串行存储和转发,必须明确禁用并行存储转发。禁用Queue、topic、Transactions

Xml代码

<broker brokerName=”broker” persistent=”true” useShutdownHook=”false”>

<persistenceAdapter>

<kahaDB directory=”activemq-data”

journalMaxFileLength=”32mb”

concurrentStoreAndDispatchQueues=”false”

concurrentStoreAndDispatchTopics=”false”

concurrentStoreAndDispatchTransactions=”false”

/>

</persistenceAdapter>

</broker>

下图展示串行存储和转发:

1、producer发送一个消息M到broker上的某个destination
2、broker发送消息到持久层。因为并行存储转发已关闭,消息将会被立即写入到日志文件中。
3、消息被分发到一个或多个消费者
4、消费者发送消息消费确认信息到broker
5、当broker收到所有消费者的确认信息后,broker通知持久层删除该消息。

为了避免丢失消息,JMS规范要求broker在发送给producer确认接收信息前把消息持久化。如果使用Jms事务中时,会将事务的相关信息也持久化。默认情况下,KahaDB并未如此配置。如果应用需要避免丢失信息,就需要对KahaDB的配置进行修改:
1、配置并行存储转发为false,使用串行存储转发
2、持久化时使用同步写入。配置enableJournalDiskSyncs为true。

另外,使用事务可以提升持久层的效率。因为事务是批量处理消息的。在事务提交之前,所有的消息都不会写入到message store中的。这样就减少了IO,每次事务提交进行一次IO,而不是每条消息进行一次IO。



快乐成长 每天进步一点点