activemq的优先级

我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。

JMS标准中约定priority可以为0~9的数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持0~9,因为JDBC存储器可以基于priority对消息进行排序和索引化;但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。在broker端,默认是不支持priority排序的,我们需要手动开启:

  1. <policyEntry queue=">" prioritizedMessages="true"/>

一旦开启了此属性,此后消息存储时,将会按照prioprity的倒序索引化消息(比如kahadb B+Tree,此后priority将作为索引的一部分)。此后broker从存储器中获取消息时,权重较高的消息将会被优先获取;对于JDBC等其他存储器,可能在获取消息时,按照priority作为排序列来筛选消息。

我们首先简单的描述一下,消息在broker端pending的过程,这涉及到prefetch机制以及消息是否持久化等方面的问题。在broker端,为了优化Consumer消费的效率,通常开启prefetch策略,即从通道中(Queue/Topic)批量加载多条消息,这些消息可能来自内存(非持久化),也可能来自存储文件(持久化消息,或者非持久化消息被swap到临时文件中);borker会为每个Consumer创建一个基于内存的pending buffer,用来保存即将发送给Consumer的消息列表。当buffer中的消息被Consumer消费之后,将会从内存或者文件中继续加载多条消息,然后再根据需要将一定量的消息放入到pending buffer中。由此可见,我们只能保证每次prefetch的消息列表是按照priority排序的,但是有可能在buffer中的消息还没有发送之前,会有更高优先级的消息被写入文件或内存,事实上这已经不能改变消息发送的顺序了;因为我们无法在全局范围内,保证Consumer即将消费的消息是权重最高的!!

不过对于“非持久化”类型的消息(如果没有被swap到临时文件),它们被保存在内存中,它们不存在从文件Paged in到内存的过程,因为可以保证优先级较高的消息,总是在prefetch的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。

Broker在收到Producer的消息之后,将会把消息cache到内存,如果消息需要持久化,那么同时也会把消息写入文件;如果通道中Consumer的消费速度足够快(即积压的消息很少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来cache消息),那么消息几乎不需要从存储文件中Paged In,直接就能从内存的cache中获取即可,这种情况下,priority可以担保“全局顺序”;不过,如果消费者滞后太多,cache已满,就会触发新接收的消息直接保存在磁盘中,那么此时,priority就没有那么有效了。

在Queue中,prefetch的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个consumer的pending buffer中,比如有m1-m2-m3-m4四条消息,有C1-C2两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上m2的权重>m3,对于C1而言,它似乎丢失了“顺序性”。

  1. //参见org.apache.activemq.broker.region.Queue
  2. //doActualDispatch方法
  3. //伪代码,描述
  4. //pendingList: 从store中PageIn 的消息列表,亟待发送
  5. Iterator it = pendingList.iterator();
  6. //为了当前Page的消息发送,创建一个消费者列表的copy
  7. //每个Queue都持有目前所有的消费者列表。
  8. //在broker端,Subscription负责和Consumer客户端通讯
  9. List<Subscription> copyConsumers = new ArrayList<Subscription>(this.allSubscriptions);
  10. Set<Subscription> fullConsumers = new HashSet<Subscription>();
  11. while(it.hasNext()){
  12.     Message mx = it.next();
  13.     for(Subscription consumer : copyConsumers){
  14.         if(fullConsumers.contains(consumer)){
  15.             break;//如果当前consumer亟待确认的消息已经超过了prefetch
  16.         }
  17.         //如果当前consumer中的pendingBuffer没有满
  18.         if(!consumer.isFull()){
  19.             it.remove();
  20.             consumer.dispatch(mx);//异步发送消息,addBuffer
  21.             if(!strictOrderDispatch){
  22.                 //如果不是严格顺序发送
  23.                 //对consumers按照权重排序
  24.                 //权重高的consumer,将会被有限填充buffer
  25.                 //当权重相同时,将会按照它们最后发送消息的ID,倒序排列
  26.                 //即消息ID较小的,将会优先获取新消息。
  27.                 Collections.sort(this.allSubscritpions);
  28.                 //重建copy
  29.                 copyConsumers = new ArrayList<Subscription>(this.allSubscriptions);
  30.             }
  31.         }else{
  32.             fullConsumers.add(consumer);
  33.         }
  34. }
  35. if(!pendingList.isEmpty()) {
  36.     //剩余的消息,将会被cache起来,继续轮训上述过程
  37. }

为了让priority的消息更加具备顺序性,我们可以通过如下配置来调整:

  1. //queue or topic
  2. <policyEntry queue=">" strictOrderDispatch="true" />

strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;不过不要误解它为“全局严格顺序”,它只不过是将prefetch的消息依次填满每个consumer的pending buffer。比如上述例子中,如果C1-C2两个消费者的buffer尺寸为3,那么(m1,m2,m3)->C1,(m4)->C2;当C1填充完毕之后,才会填充C2。由此这种策略可以保证buffer中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch并非是解决priority消息顺序的问题而生,只是在使用priority时需要关注它)。

对于Queue而言,仅仅使用strictOrderDispatch并不能完全解决顺序问题,它可能是相对高效但是比较粗略的方式;如果需要严格保证有序性,我们需要按照如下方式配置:

  1. <policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />

useCache=false来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消息的转发效率);queuePrefetch=1来约束每个consumer任何时刻只有一个消息正在处理,那些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读取肯定都是priority最高的消息。[对于broker而言,如果指定了prioritizedMessage将在存储时就根据消息的权重建立索引顺序,在内存中使用PrioritizedPendingList保存消息;否则将使用OrderedPendingList]

“strictOrderDispatch”也适用于Topic,broker可以保证所有Subscriber获得的消息的顺序是一致的。不过Topic支持DispatchPolicy,但是不支持"strictOrderDispatch"属性,这个与Queue有所不同。

  1. <policyEntry topic=">">
  2.     <dispatchPolicy>
  3.         <strictOrderDispatchPolicy />
  4.     </dispatchPolicy>
  5. </policyEntry>

此外,对于Consumer而言,我们需要关注一个参数:messagePrioritySupported;它表示consumer端是否支持权重排序,默认为true,即当Consumer客户端使用了prefetchSize时,将会对这些已经到达Session但还没有转发给Consumer的消息列表,按照权重排序。我们可以通过这种方式开启:

  1. //brokerUrl中
  2. jms.messagePrioritySupported=true

如果你关闭了此选项,那么需要注意,当高权重的消息因为消费异常而重发时,将不能被优先消费。

设置方式

消息的优先级。0-4为正常的优先级,5-9为高优先级。可以通过下面方式设置:
producer.setPriority(9);

---------

http://activemq.apache.org/consumer-priority.html consumer 优先级

http://activemq.apache.org/activemq-message-properties.html 消息优先级

1、设置 consumer 的优先级:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);

priority 的取值从0到127。broker 按照 consumer 的优先级给 queue 的 consumers 排序,首先把消息分发给优先级最高的 consumer。一旦该 consumer 的 prefetch buffer 满了,broker 就把消息分发给优先级次高的,prefetch buffer 不满的 consumer。

// org.apache.activemq.broker.region.Queue
// consumer priority 的比较器
private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {

    @Override
    public int compare(Subscription s1, Subscription s2) {
        // We want the list sorted in descending order
        // 倒序,即数值大的优先级高
        int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
        if (val == 0 && messageGroupOwners != null) {
            // then ascending order of assigned message groups to favour less loaded consumers
            // Long.compare in jdk7
            long x = s1.getConsumerInfo().getLastDeliveredSequenceId();
            long y = s2.getConsumerInfo().getLastDeliveredSequenceId();
            val = (x < y) ? -1 : ((x == y) ? 0 : 1);
        }
        return val;
    }
};

// 添加 consumer 的时候,会触发排序
// 在 consumers 列表中,靠前的 consumer,先分发消息
private void addToConsumerList(Subscription sub) {
    if (useConsumerPriority) {
        consumers.add(sub);
        Collections.sort(consumers, orderedCompare);
    } else {
        consumers.add(sub);
    }
}

2、设置 message 的优先级需要在 broker 端和 producer 端配置:

2.1 在 broker 端设置 TEST.BAT 队列为 prioritizedMessages = "true"

<policyEntry queue="TEST.BAT" prioritizedMessages="true" producerFlowControl="true" memoryLimit="1mb">
    <deadLetterStrategy>
        <individualDeadLetterStrategy queuePrefix="TEST"/>
    </deadLetterStrategy>
    <pendingQueuePolicy>
        <storeCursor/>
    </pendingQueuePolicy>
</policyEntry>

2.2 producer 发送消息时,设置 message 的优先级

TextMessage message = session.createTextMessage(text);
producer.send(destination, message, DeliveryMode.NON_PERSISTENT, 1, 0);

设置 message 的优先级,需要调用:

void javax.jms.MessageProducer.send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException

而不能这么写:

TextMessage message = session.createTextMessage(text);
message.setJMSPriority(0);

初步看是 ActiveMQ 的 bug。消息的 priority 值,从0到9。消息配置了优先级之后,消息存放在 PrioritizedPendingList 中。

// 省略部分代码
private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
    private int index = 0;
    private int currentIndex = 0;
    List<PendingNode> list = new ArrayList<PendingNode>(size());

    PrioritizedPendingListIterator() {
        for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
            // priority 值大的优先级高
            OrderedPendingList orderedPendingList = lists[i];
            if (!orderedPendingList.isEmpty()) {
                list.addAll(orderedPendingList.getAsList());
            }
        }
    }
}

---------

ActiveMq之消息的优先级-yellowcong

消息发送后,有的时候收发消息的顺序不是我们发送消息前的优先级发送的,我们可以通过配置activemq.xml,设定哪个队列按照优先级别来进行数据的发送。

修改配置文件

修改配置文件,设定需要配置优先级操作的消息队列

vim  apache-activemq-5.11.1/conf/activemq.xml

#大概44行左右
#设定queue="queue1" 设定 queue1 的队列是按优先级发送的
<policyEntry queue="queue1" prioritizedMessages="true" />

#重启服务生效
apache-activemq-5.11.1/bin/activemq-admin start

测试案例

package com.yellowcong.provice;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 创建用户:狂飙的yellowcong<br/>
 * 创建日期:2017年12月9日<br/>
 * 创建时间:上午11:01:51<br/>
 * 机能概要:
 */
public class DemoSendMessage {

    // activemq的服务器地址
    private static final String ACTIVEMQ_HOST = "tcp://192.168.100.10:61616";

    // 用户名
    private static final String USERNAME = "yellowcong";
    // 密码
    private static final String PASSWORD = "yellowcong";

    public static void main(String[] args) throws Exception {
        provider();

        customer();
    }

    /**
     * 创建用户:狂飙的yellowcong<br/>
     * 创建日期:2017年12月9日<br/>
     * 创建时间:上午10:52:05<br/>
     * 机能概要:消费者确认消息
     * 
     * @throws Exception
     */
    public static void customer() throws Exception {
        // 获取连接
        Connection conn = null;

        try {
            conn = getConnection();
            // 获取session
            Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
            // 需要和activemq.xml配置文件的队列对上
            Destination destination = session.createQueue("queue1");

            // 获取消费者
            MessageConsumer cus = session.createConsumer(destination);
            System.out.println("-------------接收并确认消息----------------");
            while (true) {
                // 接收消息
                TextMessage msg = (TextMessage) cus.receive();
                if (msg == null) {
                    break;
                }
                // 确认接收, 又开启一个线程,去发送给服务器,按收到消息了
                msg.acknowledge();
                System.out.println(msg.getText());

            }

        } finally {
            // 关闭连接
            conn.close();
        }
    }

    /**
     * 创建用户:狂飙的yellowcong<br/>
     * 创建日期:2017年12月9日<br/>
     * 创建时间:上午10:48:03<br/>
     * 机能概要: 生产消息
     * 
     * @throws Exception
     */
    public static void provider() throws Exception {
        // 获取连接
        Connection conn = null;
        try {
            conn = getConnection();
            // 获取session
            Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
            // 创建队列
            // 需要和activemq.xml配置文件的队列对上
            Destination destination = session.createQueue("queue1");
            // 获取生产者
            MessageProducer pro = session.createProducer(destination);

            // 创建消息
            TextMessage msg = session.createTextMessage();

            // 发送消息
            // 第一个参数 消息的目的地 Destination
            // 第二个参数 发送的消息
            // 第三个参数 消息的模式 DeliveryMode
            // 第四个参数 消息的优先级
            // 第五个参数 消息存活时间 存活时间单位是ms
            for (int i = 0; i < 10; i++) {
                msg.setText("请确认消息,消息添加顺序"+i+",优先级" + i);
                pro.send(destination, msg, DeliveryMode.NON_PERSISTENT, i, 1000 * 10);
            }

            // 提交事物
            session.commit();

            System.out.println("-----------------发送消息----------------");
        } finally {
            // 关闭连接
            conn.close();
        }

    }

    /**
     * 创建用户:狂飙的yellowcong<br/>
     * 创建日期:2017年12月9日<br/>
     * 创建时间:上午10:42:48<br/>
     * 机能概要: 获取连接
     * 
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        // 1.获取工厂连接类
        ConnectionFactory fc = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);

        // 2.获取连接
        Connection conn = fc.createConnection();
        conn.start();

        System.out.println("-----------------获取连接----------------");

        return conn;

    }
}

运行结果

搭建可以发现,优先级别高的先消费。