activemq的消息存储机制

  categories:mq  tags:  author:

一.KahaDB简介

ActiveMQ 5.3以后,出现了KahaDB。她是一个基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器。
她的设计初衷就是使用简单并尽可能的快。KahaDB的索引使用一个transaction log,并且所有的destination只使用一个index,有人测试表明:如果用于生产环境,支持1万个active connection,每个connection有一个独立的queue。该表现已经足矣应付大部分的需求。
KahaDB内部分为:data logs, 按照Message ID高度优化的索引,memory message cache。

data logs扮演一个message journal,存储消息和命令。当大小超过了规定,将会新建一个data log
.所有在data log里的消息是引用计数的,所以当一个log里的消息不在需要了,这里在添加一句, 经过测试是这个文件中全部消息都可以不再需要了, 才进行后续操作, 假若有一条消息还需要保留, 则整个文件都是被保留的, 因此若是, 您万一多个或者全部日志文件里面都有少量消息需要保存, 这样会造成日志的无法删除,在最坏的情况下, 若是配置了相关的kahadb的相关流控(其实也是磁盘容量,内存容量等的限制),若是日志文件长期无法删除, 会造成生产者无法再发消息(由于磁盘空间占用等达到限额, 并且日志又不能删除,没有多余磁盘空间,因此在配置正确情况下, 客户端会接受到相关的异常信息, 配置不好的, 用户会被挂起,并且没有异常消息的。),可以被删除或者放入archived文件夹。每次消息的写入都是在log的末尾增加记录,所以存储速度很快。
缓存则是临时持有那些有对应消费者在线的消息。如果消费者反馈消息已经成功接收,那么这些消息就不用写入磁盘。
BTree索引,保存消息的引用,并按照message ID排序。Redo log是用来保证MQ broker未干净关闭情况下,用于Btree index的重建。
KahaDB的目录会在你启动MQ后自动创建(使用了KAhaDB作为存储器),

db log files:以db-递增数字.log命名。
archive directory: 当配置支持archiving(默认不支持)并且存在,该文件夹才会创建。用于存储不再需要的data logs。
db.data:存储btree索引
db.redo:用于hard-stop broker后,btree索引的重建

二.KahaDB的异常处理机制

KahaDB 支持多种机制在系统异常关闭后重启并恢复。包括检测数据文件丢失和还原损坏的metadata。这些特性并不能完全保证系统异常关闭不造成消息丢失。如果需要保证系统的高可靠性,建议部署到容灾系统上。例如RAID磁盘阵列中。

当broker正常关闭时, KahaDB message store会将所有的缓存数据刷到文件系统中。尤其是这些数据:
1、所有未处理的日志数据
2、所有缓存的metadata
最后meta store中的信息与journal数据文件中的数据保持一致性。

正常情况下,在系统恢复时优先读取journal中的数据。因为metacache中的索引信息是周期性的更新到meta store中的。当系统异常关闭时,可能journal中有的数据meta store中并没有不存在索引。但是KahaDB在恢复时会先读取meta store中的数据,然后再读取journal有但是meta store不存在的数据(因为KahaDB根据meta store中的索引信息快速定位到metastore没有但是journal文件中包含的数据,然后根据这些数据重新在meta store中建立索引信息)

KahaDB会在更新metadata store之前,保存更新操作的概要信息到重做日志(db.redo)中。减少系统异常关闭时的风险。因为重做日志非常小,所以在系统异常关闭时能快速写入。当系统恢复时会判断重做日志中的信息是否需要更新到metadata中。

如果 metadata store 已被不可挽回的损坏了,可以删除metadata store文件(db.data)来强制恢复;只不过这个时候,broker会读取所有的journal文件来重建metadata store,需要一段比较长的时间。

KahaDB可以检测是否有journal文件丢失,如果有丢失,默认将会抛出一个异常然后关闭。便于管理员调查丢失的journal文件,并手 动还原。可以通过设置ignoreMissingJournalfiles为true,让broker在启动时忽略这些丢失的journal文件。

KahaDB同样可以检测journal文件的完整性。不过这些特性需要明确的配置来启用。

<persistenceAdapter>

<kahaDB directory=”activemq-data”

journalMaxFileLength=”32mb”

checksumJournalFiles=”true”

checkForCorruptJournalFiles=”true”

/>

</persistenceAdapter>

三.AMQ Message Store 是什么?

具体参考:http://activemq.apache.org/kahadb.html

默认的activemq消息存储是通过一个所谓的AMQ Message Store来完成。
AMQ Message Store是一个高效的可嵌入支持事务的消息存储解决方案。
在此方案下消息(Message)本身以日志的形式实现持久化,存放在Data Log里。并且还对日志里的消息做了引用索引,方便快速取回Message。

一般情况下消息索引存放于内存(Cache)中,MQ Server定期将索引内容持久化,存放到Reference Store。
Message Data Log文件是有容量限制的,默认是32MB,可自行配置容量。当该Data Log文件里所有消息都被消费完的时候,Data Log文件就会被加上一个标记,通知下一次消息清理时可以被处理掉(处理方式可以是delete或是转移到Achieve目录)。

AMQ Message Store方案中 Cache 、Data Log、Reference Store 协作图如下:

AMQ Message Store的属性是可以配置的,
你可以在conf/activemq.xml配置文件里添加上如下配置:

Xml代码

<persistenceAdapter>

<amqPersistenceAdapter directory=”activemq-data” maxFileLength=”32mb”/>

</persistenceAdapter>

四.kahadb属性说明

property name

default value

Comments

directory

activemq-data

存储消息文件和日志的目录

useNIO

true

使用 NIO 特性

syncOnWrite

false

同步写文件到磁盘

maxFileLength

32mb

Message Data日志文件的最大 Size

persistentIndex

true

持久化日志索引,如果设为 false ,则在内存中保存

maxCheckpointMessageAddSize

4kb

在自动提交前在事务中能保持的最大消息数

cleanupInterval

30000

每隔多少时间清理不再使用的消息日志(毫秒)

indexBinSize

1024

这个值是用来提升索引的性能的,值越大,索引相对性能越好

indexKeySize

96

index key的size,index key基于message id

indexPageSize

16kb

索引页的size

directoryArchive

archive

消费完的Data Log存放的目录

archiveDataLogs

false

设置为true的话,消费完的Data Log就放到Archive目录,而不是删除。

AMQ Message Store体系中 目录结构参照下图 :

顶层目录broker name 用broker name命名,默认目录名是localhost,broker name在activemq的配置文件里指定,以下是它的子目录:

archive 丢弃的Data Log就放到这里,当archiveDataLogs 属性配置为true时才会存在

journal message data log的所在

kr-store reference store 目录

data 引用索引所在目录

state 记录store的状态

tmp-storage 用来存储一些事物性的消息以减轻内存的负担例如等待正常但是速度很慢的消费端来消费非持久化的Topic.

其他持久化方式 activemq同样支持JDBC持久化Message,我们只需要把配置从

<persistenceAdapter>

<amqPersistenceAdapter directory=”activemq-data” maxFileLength=”32mb”/>

</persistenceAdapter>

改成AMQ Message Store and JDBC(推荐,同时使用两者可以同时保证效率和可靠性):

<persistenceAdapter>

<journaledJDBC dataDirectory=”${activemq.base}/data” dataSource=”#oracle-ds”/>

</persistenceAdapter>

<persistenceAdapter>

<jdbcPersistenceAdapter dataSource=”#oracle-ds”/>

</persistenceAdapter>



快乐成长 每天进步一点点