标签归档:消息

RocketMq知识点理解

RocketMq关键点理解

各个角色间的关系:

RocketMq中每个Broker(master和slave)与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
Name Server之间不会有任何信息交互,各自独立。
producer和consumer随机从一个name server即可获得全部topic的路由信息。
producer根据得到的路由信息,同master建立长连接。
consumer根据路由信息,同master个salve都建立长连接。然后根据设置的订阅规则,选择从master或者slave订阅消息。

Broker:

分为master和slave两个角色。master提供读写读物,salve只提供读服务。
为了保证可用性,需要部署多套broker,每套broker至少有1个master和1个以上的salve。
在同一套broker中,master和salve都是同样的brokerName,master的brokerId是0,salve的brokerId必须是非0的。

同步刷盘和异步刷盘。

同步刷盘是说,broker在收到每个消息后,都是先要保存到硬盘上,然后再给producer确认。异步刷盘就是先回复确认,然后批量保存到硬盘上。异步刷盘有更好的性能,当然也有更大的丢失消息的风险。

同步复制和异步复制。

是说在master和salve之间复制消息的方式。同步是说在salve也存储了消息后再答复producer。
异步复制是先答复producer,再去向salve复制。
通过同步复制技术可以完全避免单点,同步复制势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。RocketMQ从3.0版本开始支持同步双写。

MQPullConsumer和MQPushConsumer的区别

consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。
区别是:
push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。
pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

RocketMQ使用长轮询Pull方式,可保证消息非常实时,消息实时性不低于Push的理解

数据交互有两种模式:Push(推模式)、Pull(拉模式)。
推模式指的是客户端与服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推送到客户端。其优点是及时,一旦有数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。
拉模式指的是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。

长轮询:
轮询是说,每隔一定时间,客户端想服务端发起一次请求,服务端有数据就返回数据,没有数据就返回空,然后关闭请求。
长轮询,不同之处是,服务端如果此时没有数据,保持连接。等到有数据返回(相当于一种push),或者超时返回。
所以长轮询Pull的好处就是可以减少无效请求,保证消息的实时性,又不会造成客户端积压。

其他

  1. 对于一个消息中间件来说,持久化部分的性能直接决定了整个消息中间件的性能。RocketMQ充分利用Linux文件系统内存cache来提高性能
  2. RocketMq Broker的buffer不会满。原因是RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。这是RocketMq和其他消息中间件的重要区别。对RocketMQ来说的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据。
  3. 消息堆积。消息堆积的能力是评价一个消息中间件的重要方面。因为使用消息中间件有一部分功能是为了为后端系统挡住数据洪峰。在产生消息堆积时,消息中间件对外的服务能力至关重要。
    因为RocketMq的消息都是持久化硬盘的,当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
  4. 分布式事务。分布式事务涉及到两阶段提交。分为预提交阶段和commit阶段。在commit阶段需要回去改消息的状态。RocketMq在这里没有使用KV存储来做。而是在commit阶段会拿到消息的offset,然后直接去找消息,修改其状态。这样的好处是设计更简单,速度更快。缺点是会产生过多的数据脏页。
  5. producer只与master建立连接,consumer同master和slave都建立连接,向谁订阅可以配置。
  6. Rocketmq的消息的存储是由consume queue和 commitLog 配合完成的
    consume queue 消息的逻辑队列,相当于字典的目录用来指定消息在消息的真正的物理文件commitLog上的位置, 每个topic下的每个queue都有一个对应的consumequeue文件。
    默认文件地址:${user.home} \store\consumequeue${topicName}${queueId}${fileName}
    修改方法:配置文件的
    storePathRootDir=/home/haieradmin/mqstore/rocketmqstore
    storePathCommitLog=/home/haieradmin/mqstore/rocketmqstore/commitlog
    这两个参数。

 

说到高性能消息中间件,第一个想到的肯定是 LinkedIn 开源的 Kafka ,虽然最初 Kafka 是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。 RocketMQ 是 MetaQ 的 3.0 版本,而 MetaQ 最初的设计又参考了 Kafka 。MetaQ 1.x 和 MetaQ 2.x 是依赖 ZooKeeper 的,但 RocketMQ (即 MetaQ 3.x )却去掉了 ZooKeeper 依赖,转而采用自己的 NameServer (专门为Rocket设计的轻量级名称服务)。接下来看一下NameSever在RocketMQ中的作用想必大家就清楚了。

一:rocketmq为什么使用nameserver?
RocketMQ 的 Broker 有三种集群部署方式: 1. 单台 Master 部署; 2. 多台 Master部署; 3. 多 Master 多 Slave 部署;采用第 3 种部署方式时, Master 和 Slave 可以采用同步复制和异步复制两种方式。下图是第 3 种部署方式的简单图:

图1.多master多slave部署

下面解读一下它们之间的关系:
1.1.NameServer互相独立,彼此没有通信关系,单台NameServer挂掉,不影响其他NameServer。NameServer不去连接别的机器,不会主动推消息。

1.2.单个broker(Master、Slave)与所有NameServer进行定时注册,以便告知NameServer自己还活着。Broker每隔30秒向所有NameServer发送心跳,心跳包含了自身的topic配置信息。NameServer每隔10秒,扫描所有还存活的broker连接,如果某个连接的最后更新时间与当前时间差值超过2分钟,则断开此连接。此外,NameServer也会断开此broker下所有与slave的连接。同时更新topic与队列的对应关系,但不会通知生产者和消费者。
Broker slave 同步或者异步从Broker master 上拷贝数据。

1.3.consumer随机与一个NameServer建立长连接,如果该NameServer断开,则从NameServer列表中查找下一个进行连接。
consumer主要从NameServer中根据topic查询broker的地址,查到就会缓存到客户端,并向提供topic服务的master、slave建立长连接,且定时向master、slave发送心跳。如果broker宕机,则NameServer会将其剔除,而consumer端的定时任务MQClientInstance.this.updateTopicRouteInfoFromNameServer每30秒执行一次,会将topic对应的broker地址拉取下来,此地址已经为slave地址了,故此时consumer会从slave上消费。 消费者与master和slave都建有连接,在不同场景有不同的消费规则。

1.4.Producer随机与一个NameServer建立长连接,每隔30秒(此处时间可配置)从NameServer获取topic的最新队列情况,这就表示如果某个broker master宕机,producer最多30秒才能感知,在这个期间,发往该broker master的消息将会失败。Producer会向提供topic服务的master建立长连接,且定时向master发送心跳。生产者与所有的master连接,但不能向slave写入。
客户端是先从NameServer寻址的,得到可用Broker的IP和端口信息,然后自己去连接broker。
(ps:我们在设计系统的时候,有一些全局使用的公用信息,可以单独独立一个模块进行专门的管理;各个子模块只需要定时向该模块更新信息即可。)

综上所述,我们可以得出NameServer在RocketMQ中所扮演的角色:1.NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
2.NameServer 用来保存所有 topic 和该 topic 所有队列的列表。3.NameServer 用来保存所有 broker 的 Filter 列表。
对于RocketMQ 来说,topic 的数据在每个 Master 上是对等的,没有哪个 Master 上有 topic 上的全部数据,所以对于zookeeper的Master 选举功能在Rocket中使用不到。
Broker与slave配对是通过指定相同的brokerName参数来配对,master的brokerId必须为0,slave的brokerId必须大于0,此外一个master下可以挂多个slave,同一个master下的多个slave通过指定不同的brokerId来区分。

二:核心模块
2.1.注册broker

2.2 存放消息
Producer在存放消息时,首先会使用GET_ROUTEINTO_BY_TOPIC获取route信息TopicRouteData。
Producer Group–Producer实例的集合。
Producer实例可以是多机器、但机器多进程、单进程中的多对象。Producer可以发送多个Topic。处理分布式事务时,也需要Producer集群提高可靠性。

2.3 读取消息
Consumer在读取消息时,也会先使用GET_ROUTEINTO_BY_TOPIC获取route信息TopicRouteData。
Consumer Group–Consumer实例的集合。
Consumer 实例可以是多机器、但机器多进程、单进程中的多对象。同一个Group中的实例,在集群模式下,以均摊的方式消费;在广播模式下,每个实例都全部消费。

2.3.1.Push Consumer
Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法。所以,所谓Push指的是客户端内部的回调机制,并不是与服务端之间的机制。

2.3.2.Pull Consumer
Consumer的一种,应用通常主动调用Consumer从服务端拉消息,consumer 向 broker 发送拉消息请求, PullMessageService 服务通过一个线程将阻塞队列 LinkedBlockingQueue 中的 PullRequest 到 broker 拉取消息,长轮询向broker拉取消息是批量拉取的,默认设置批量的值为pullBatchSize = 32, 可配置。主动权由应用控制,这用的就是短轮询方式了,在不同情况下,与长轮询各有优点。

三:消息
*Message priority
处于一个队列,可用整数来描述每条消息的优先级,在投递前先按优先级排好序,然后令优先级高的消息先投递。RocketMQ没有特意支持消息优先级,它的所有的消息都是持久化,如果按优先级排序,开销会非常大,可以通过变通方式实现类似功能,即不同优先级各对应一个队列,发送消息时根据优先级发送到与之对应的队列中。

*Message order
rocketmq的顺序消息需要满足2点:
1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。
顺序消费的发送的原理:
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
{
@Override
public MessageQueue select(List mqs,Message msg, Object arg)
{
Integer id = (Integer) arg;
int index = id % mqs.size(); // 如果队列数不变,同一个订单号取到的队列是同一个
return mqs.get(index);
}
}, “10001”); // orderID “10001”是传递给回调方法的自定义数据
}
//List mqs 就是从namesrv 获取的所有队列

那么在集群消费时它是如何做到消息的有序性?
1.ConsumeMessageOrderlyService类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。 consumer收到后,设置是否锁住标志位。

2.ConsumeMessageOrderlyService.ConsumeRequest的run方法是消费消息,这里还有个MessageQueueLock messageQueueLock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。

3.拉到消息存入ProcessQueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从ProcessQueue里取出消息,用MessageListenerOrderly进行消费。 拉到消息后调用ProcessQueue.putMessage(final List msgs) 存入,具体是存入TreeMap msgTreeMap。
然后是调用ProcessQueue.takeMessags(final int batchSize)消费,具体是把msgTreeMap里消费过的消息,转移到TreeMap msgTreeMapTemp。

4.本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。

当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。
当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。

Listener 接口方法:
MessageListenerOrderly ->是有序的
MessageListenerConcurrently ->是无序的

*Message Filter
Broker端过滤-RocketMQ支持根据msgId、key、Header、body进行过滤。根据Message Key查询仅仅返回符合条件的最近64条数据,所以key值尽量唯一,并且有业务区分度。

*发送消息负载均衡
使用Roundbin方式,轮询发送消息,每个队列接收平均的消息量。也可以自定义选择发往哪个队列。

 

RocketMQ初探一:NameServer的作用

第一次真正接触Java消息服务是在2013年底,当时是给中国移动做统一支付平台,当时用的就是著名的Apache ActiveMQ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《Java Message Service》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现JMS规范的MOM有ActiveMQ/Apollo和HornetQ,都是采用Java实现。JMS中说明了Java消息服务的两种消息传送模型,即P2P(点对点)和Pub/Sub(发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口API,是否实现了该API,标志着MOM是否支持JMS规范,JMS规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现JMS规范的MOM,性能总不会太高,而且JMS规范中没有涉及消息服务的分布式特性,导致大多数实现JMS规范的MOM分布式部署功能比较弱,只适合集群部署。

 

说到高性能消息中间件,第一个想到的肯定是LinkedIn开源的Kafka,虽然最初Kafka是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。RocketMQ是MetaQ的3.0版本,而MetaQ最初的设计又参考了Kafka。最初的MetaQ 1.x版本由阿里的原作者庄晓丹开发,后面的MetaQ 2.x版本才进行了开源,这里需要注意一点的事,MetaQ 1.x和MetaQ 2.x是依赖ZooKeeper的,但RocketMQ(即MetaQ 3.x)却去掉了ZooKeeper依赖,转而采用自己的NameServer。

 

ZooKeeper是著名的分布式协作框架,提供了Master选举、分布式锁、数据的发布和订阅等诸多功能,为什么RocketMQ没有选择ZooKeeper,而是自己开发了NameServer,我们来具体看看NameServer在RocketMQ集群中的作用就明了了。

 

RocketMQ的Broker有三种集群部署方式:1.单台Master部署;2.多台Master部署;3.多Master多Slave部署;采用第3种部署方式时,Master和Slave可以采用同步复制和异步复制两种方式。下图是第3种部署方式的简单图:

 


图虽然是网上找的,但也足以说明问题,当采用多Master方式时,Master与Master之间是不需要知道彼此的,这样的设计直接降低了Broker实现的复查性,你可以试想,如果Master与Master之间需要知道彼此的存在,这会需要在Master之中维护一个网络的Master列表,而且必然设计到Master发现和活跃Master数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是Master只做好自己的事情(比如和Slave进行数据同步)即可,这样,在分布式环境中,某台Master宕机或上线,不会对其他Master造成任何影响。

 

那么怎么才能知道网络中有多少台Master和Slave呢?你会很自然想到用ZooKeeper,每个活跃的Master或Slave都去约定的ZooKeeper节点下注册一个状态节点,但RocketMQ没有使用ZooKeeper,所以这件事就交给了NameServer来做了(看上图)。

 

结论一:NameServer用来保存活跃的broker列表,包括Master和Slave。

当然,这个结论百度一查就知道,我们移步到rocketmq-namesrv模块中最重要的一个类:RouteInfoManager,它的主要属性如下:

 

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

 

每个属性通过名字就能清楚的知道是什么意思,之所以能用非线程安全的HashMap,是因为有读写锁lock来对HashMap的修改做保护。我们注意到保存broker的Map有两个,即brokerAddrTable用来保存所有的broker列表和brokerLiveTable用来保存当前活跃的broker列表,而BrokerData用来保存broker的主要新增,而BrokerLiveInfo只用来保存上次更新(心跳)时间,我们可以直接看看RouteInfoManager中扫描非活跃broker的方法:

 

// Broker Channel两分钟过期

private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;

public void scanNotActiveBroker() {

Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, BrokerLiveInfo> next = it.next();

long last = next.getValue().getLastUpdateTimestamp();

if ((last + BrokerChannelExpiredTime) < System.currentTimeMillis()) {

RemotingUtil.closeChannel(next.getValue().getChannel());

it.remove();

log.warn("The broker channel expired, {} {}ms", next.getKey(), BrokerChannelExpiredTime);

this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

}

}

}

 

可以看出,如果两分钟内都没收到一个broker的心跳数据,则直接将其从brokerLiveTable中移除,注意,这还会导致该broker从brokerAddrTable被删除,当然,如果该broker是Master,则它的所有Slave的broker都将被删除。具体细节可以参看RouteInfoManager的onChannelDestroy方法。

 

结论二:NameServer用来保存所有topic和该topic所有队列的列表。

我们注意到,topicQueueTable的value是QueueData的List,我们看看QueueData中的属性:

 

private String brokerName;  // broker的名称

private int readQueueNums;  // 读队列数量

private int writeQueueNums; // 写队列数量

private int perm;           // 读写权限

private int topicSynFlag;   // 同步复制还是异步复制标记

 

所以,你几乎可以在NameServer这里知道topic相关的所有信息,包括topic有哪些队列,这些队列在那些broker上等。

 

结论三:NameServer用来保存所有broker的Filter列表。

关于这一点,讨论broker的时候再细说。

 

DefaultRequestProcessor是NameServer的默认请求处理器,他处理了定义在rocketmq-common模块中RequestCode定义的部分请求,比如注册broker、注销broker、获取topic路由、删除topic、获取broker的topic权限、获取NameServer的所有topic等。

 

在源代码中,NettyServerConfig类记录NameServer中的一些默认参数,比如端口、服务端线程数等,列出如下:

private int listenPort = 8888;

private int serverWorkerThreads = 8;

private int serverCallbackExecutorThreads = 0;

private int serverSelectorThreads = 3;

private int serverOnewaySemaphoreValue = 256;

private int serverAsyncSemaphoreValue = 64;

private int serverChannelMaxIdleTimeSeconds = 120;

 

这些都可以通过启动时指定配置文件来进行覆盖修改,具体可以参考NameServer的启动类NamesrvStartup的实现(没想到Apache还有对命令行提供支持的commons-cls的包)。

 

 

现在我们再回过头来看看RocketMQ为什么不使用ZooKeeper?ZooKeeper可以提供Master选举功能,比如Kafka用来给每个分区选一个broker作为leader,但对于RocketMQ来说,topic的数据在每个Master上是对等的,没有哪个Master上有topic上的全部数据,所以这里选举leader没有意义;RockeqMQ集群中,需要有构件来处理一些通用数据,比如broker列表,broker刷新时间,虽然ZooKeeper也能存放数据,并有一致性保证,但处理数据之间的一些逻辑关系却比较麻烦,而且数据的逻辑解析操作得交给ZooKeeper客户端来做,如果有多种角色的客户端存在,自己解析多级数据确实是个麻烦事情;既然RocketMQ集群中没有用到ZooKeeper的一些重量级的功能,只是使用ZooKeeper的数据一致性和发布订阅的话,与其依赖重量级的ZooKeeper,还不如写个轻量级的NameServer,NameServer也可以集群部署,NameServer与NameServer之间无任何信息同步,只有一千多行代码的NameServer稳定性肯定高于ZooKeeper,占用的系统资源也可以忽略不计,何乐而不为?当然,这些只是本人的一点理解,具体原因当然得RocketMQ设计和开发者来说。

来源:http://manzhizhen.iteye.com/blog/2317354

 

artemis安装

一.Apache Artemis介绍

Apache Artemis是apache的一个新的消息系统, 这个消息系统是来源于 redhat的 “异步消息系统 HornetQ

HornetQ的相关资料有如下:

http://wenku.baidu.com/view/2f19b1557fd5360cba1adbd9.html?from=search

1. 关于Apache Artemis

http://activemq.apache.org/artemis/  项目的注意

http://activemq.apache.org/artemis/download.html  下载页面

另外 可以参考 HornetQ的相关资料, 目前同 HornetQ还有很大的相似性

选择他的Artemis的考虑是, 他是中等成熟的 消息系统, 功能够用, 代码少, 阅读方便, 便于理解, 这样有了问题才好 进行相关的处理工作。

而 activemq 代码太庞大了, 功能太多, 想彻底弄清楚要费非常大力气

而apache Apollo  是基于acitvemq的基础上, 做了内核调整, 因此不确定代码情况, 以及成熟稳定情况, 可以后续再研究

2. Apache activemq, apache Apollo, apache Artemis 以及 apache kafka的区别

如下面文章

http://stackoverflow.com/questions/27666943/activemq-vs-apollo-vs-kafka

36 down vote accepted

Apache ActiveMQ is a great workhorse full of features and nice stuff. It's not the fastest MQ software around but fast enough for most use cases. Among features are flexible clustring, fail-over, integrations with different application servers, security etc.

Apache Apollo is an attempt to write a new core for ActiveMQ to cope with a large amount of clients and messages. It does not have all nice and convenient feature of ActiveMQ but scales a lot better. Apache Apollo is a really fast MQ implementation when you give it a large multi-core server and thousands of concurrent connections. It has a nice, simple UI, but is not a "one-size-fits-all" solution.

It seems that there is an attempt ongoing to merge a number of ActiveMQ features with HornetQ under the name ActiveMQ Artemis. HornetQ has JMS2.0 support, so my humble guess is that it's likely to appear in ActiveMQ 6.x.

JIRAGithub

Kafka is a different beast. It's a very simple message broker intended to scale persistent publish subscribe (topics) as fast as possible over multiple servers. For small-medium sized deployments, Kafka is probably not the best option. It also has it's way to do things to achieve the high throughput, so you have to trade a lot in terms of flexibility to get high distributed throughput. If you are new to the area of MQ and brokers, I guess Kafka is overkill. On the other hand - if you have a decent sized server cluster and wonder how to push as many messages as possible through it - give Kafka a spin!

shareimprove this answer

edited May 14 at 16:20

answered Dec 27 '14 at 18:04

Petter Nordlander

13.3k41952

Thanks! I guess that Apache Apollo is the way to go then in my case. – Martin Dec 27 '14 at 20:06

2

ActiveMQ Artemis 1.0.0 has been released: activemq.apache.org/artemis – mjn Jun 29 at 17:54

If I (partially) understand relation ActiveMQ to Apollo (newer implementation based on previous experience etc), what is general vision (goal, motivation) of Artemis? – Jacek Cz Sep 20 at 11:14

1

Apollo failed, in a way, to become the new non-blocking core of AMQ as there was a lack of Scala programmers dedicated to that effort. The vision with Artemis, as I see it - and I am not an official of ActiveMQ - is to do what Apollo failed to do. The chances are probably greater with a battle tested product (HornetQ), written in a main stream language (Java) than writing from scratch. Both aim(ed) for the same target - non blocking ActiveMQ to support new demands from IoT and mobility applications, among other things – Petter Nordlander Sep 21 at 7:27

二.Apache Artemis 安装

1. 下载Apache Artemis

http://activemq.apache.org/artemis/  项目的注意

http://activemq.apache.org/artemis/download.html  下载页面

2. 解压缩

下载后, 解压缩到目录中, 要确保没有目录名称没有空格

解压缩下载好的 windows下的zip文件

3. 打开解压缩目录中的文档 参照文档进行相关操作

Getting Started

Note (Windows users): The broker currently does not support spaces in path names. For this reason the broker should be placed in a directory with no spaces in it's absolute path.

对于windows用户目前这个消息系统不支持在路径上有空格, 因此必须使用没有空格的绝对路径

Note (Windows users): Examples below use the shell script `artemis` for use with linux, Windows users should use the `artemis.cmd` script with the same parameters.

对于本文中要使用的`artemis`命了 在windows下要使用`artemis.cmd`

4. 创建服务器Creating a broker

To create a broker, navigate to the distribution 'bin/' directory and run:

为了创建消息服务器, 请进入到安装目录的bin目录下, 然后输入下面命令

$ ./artemis create $directory
Where $directory is the folder that you'd like the broker to be created. The create process will input for any required property not specified.

这个$directory文件夹是你想把broker创建的目录, 创建过程会要求你输入一些必要的没有指定的系统属性

Example:

--user: is mandatory with this configuration:

Please provide the default username:

admin

例如, --user, 是必须的, 请提供一个默认的用户名

--password: is mandatory with this configuration:

Please provide the default password:

--password 是必须的配置, 请输入默认密码

--allow-anonymous: is mandatory with this configuration:

Allow anonymous access? (Y/N):

y

--allow-anonymous 是必须的, 用来指定是否允许匿名用户存取消息系统。

For a full list of available options for the create process you may use: $ ./artemis help create    查看详细的帮助列表

NAME

        artemis create - creates a new broker instance

SYNOPSIS

        artemis create [--allow-anonymous]

                [--cluster-password <clusterPassword>] [--cluster-user <clusterUser>]

    [--clustered] [--data <data>] [--encoding <encoding>] [--force]

        [--home <home>] [--host <host>] [--java-options <javaOptions>]

            [--password <password>] [--port-offset <portOffset>] [--replicated]

                [--role <role>] [--shared-store] [--silent] [--user <user>] [--]

                    <directory>

     ...

You can now start the broker by executing:

   "D:\mq\broker1\bin\artemis" run

Or you can setup the broker as Windows service and run it in the background:

   "D:\mq\broker1\bin\artemis-service.exe" install

   "D:\mq\broker1\bin\artemis-service.exe" start

   To stop the windows service:

      "D:\mq\broker1\bin\artemis-service.exe" stop

   To uninstall the windows service

      "D:\mq\broker1\bin\artemis-service.exe" uninstall

5. 进入应用程序目录

bin中是程序运行的必要文件

Etc目录是各种配置

Data存储数据

Log日志

6. 启动应用程序

Starting the Broker

Once the broker has been created, use the artemis (or artemis.cmd on windows) script found under the bin directory of the newly created broker to manage the life cycle of the broker. To run the Apache ActiveMQ Artemis broker with the default configuration, run the command shown below from the "bin" directory of the created broker.

新的broker服务器被创建完成后, 就可用新创建的broker目录下的bin目录中的artemis.cmd 命令(linux下artemis) 来启动或者管理broker的各项工作

$ ./artemis run   用默认配置启动服务器

To specify a broker configuration file:

$ ./artemis run --config scheme:location    指定一个配置文件进行启动

e.g.

$ ./artemis run --config xml:/home/artemis/bootstrap.xml

It is possible to configure run time paramters in the artemis.conf (artemis.conf.bat for windows) file under the "bin" directory of the broker directory.

7. 停止服务器

Stopping the Broker

To stop the broker please use the artemis script from the broker's bin directory:

$ ./artemis stop

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这 里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。
我们的应用场景是分析用户使用手机App的行为,描述如下所示:

  • 手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
  • 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持
  • 经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析

Spark Streaming介绍

Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就 构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所 示(来自官网):
spark-cluster-overview-streaming-dstream
我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过 转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且 Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才 会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久 化,Action操作包括reduceByKey、count、foreach、collect等。关于Transformations和Actions 更详细内容,可以查看官网文档。
同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作 用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间 间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行 Spark的Action操作,使得该操作之前lazy的Transformation操作序列真正地执行。

Kafka+Spark Streaming+Redis编程实践

下面,我们根据上面提到的应用场景,来编程实现这个实时计算应用。首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下:
{"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}

一个事件包含4个字段:

  • uid:用户编号
  • event_time:事件发生时间戳
  • os_type:手机App操作系统类型
  • click_count:点击次数

下面是我们实现的代码,如下所示:
package org.shirdrn.spark.streaming.utils

import java.util.Properties
import scala.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random

object KafkaEventProducer {

private val users = Array(
"4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
"011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
"068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
"d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
"6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")

private val random = new Random()

private var pointer = -1

def getUserID() : String = {
pointer = pointer + 1
if(pointer >= users.length) {
pointer = 0
users(pointer)
} else {
users(pointer)
}
}

def click() : Double = {
random.nextInt(10)
}

// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events
// bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning
def main(args: Array[String]): Unit = {
val topic = "user_events"
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)

while(true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os_type", "Android")
.put("click_count", click)

// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)

Thread.sleep(200)
}
}
}

通过控制上面程序最后一行的时间间隔来控制模拟写入速度。下面我们来讨论实现实时统计每个用户的点击次数,它是按照用户分组进行累加次数,逻辑比较简单,关键是在实现过程中要注意一些问题,如对象序列化等。先看实现代码,稍后我们再详细讨论,代码实现如下所示:
object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}

// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1
val clickHashKey = "app::users::click"

// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})

ssc.start()
ssc.awaitTermination()

}
}

上面代码使用了Jedis客户端来操作Redis,将分组计数结果数据累加写入Redis存储,如果其他系统需要实时获取该数据,直接从Redis实时读取即可。RedisClient实现代码如下所示:
object RedisClient extends Serializable {
val redisHost = "10.10.4.130"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)

lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}

上面代码我们分别在local[K]和Spark Standalone集群模式下运行通过。
如果我们是在开发环境进行调试的时候,也就是使用local[K]部署模式,在本地启动K个Worker线程来计算,这K个Worker在同一个JVM实 例里,上面的代码默认情况是,如果没有传参数则是local[K]模式,所以如果使用这种方式在创建Redis连接池或连接的时候,可能非常容易调试通 过,但是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的时候,就会报错,主要是由于在处理Redis连接池或连接的时候出错了。我们可以看一下Spark架构,如图 所示(来自官网):
spark-cluster-overview-0001无论是在本地模式、Standalone模式,还是在Mesos或YARN模式下,整个Spark集群的结构都可以用上图抽象表示,只是各个组件的运行环 境不同,导致组件可能是分布式的,或本地的,或单个JVM实例的。如在本地模式,则上图表现为在同一节点上的单个进程之内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群之外的一个节点上提交Spark Application,其他的组件都运行在YARN集群管理的节点上。
在Spark集群环境部署Application后,在进行计算的时候会将作用于RDD数据集上的函数(Functions)发送到集群中Worker上 的Executor上(在Spark Streaming中是作用于DStream的操作),那么这些函数操作所作用的对象(Elements)必须是可序列化的,通过Scala也可以使用 lazy引用来解决,否则这些对象(Elements)在跨节点序列化传输后,无法正确地执行反序列化重构成实际可用的对象。上面代码我们使用lazy引 用(Lazy Reference)来实现的,代码如下所示:
// lazy pool reference
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
...
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})

另一种方式,我们将代码修改为,把对Redis连接的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理,如下所示:
package org.shirdrn.spark.streaming

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder
import net.sf.json.JSONObject
import redis.clients.jedis.JedisPool

object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}

// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1
val clickHashKey = "app::users::click"

// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {

/**
* Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
*/
object InternalRedisClient extends Serializable {

@transient private var pool: JedisPool = null

def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
}

def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if(pool == null) {
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
poolConfig.setTestOnBorrow(testOnBorrow)
poolConfig.setTestOnReturn(testOnReturn)
poolConfig.setMaxWaitMillis(maxWaitMillis)
pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)

val hook = new Thread{
override def run = pool.destroy()
}
sys.addShutdownHook(hook.run)
}
}

def getPool: JedisPool = {
assert(pool != null)
pool
}
}

// Redis configurations
val maxTotal = 10
val maxIdle = 10
val minIdle = 1
val redisHost = "10.10.4.130"
val redisPort = 6379
val redisTimeout = 30000
val dbIndex = 1
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)

val uid = pair._1
val clickCount = pair._2
val jedis =InternalRedisClient.getPool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
InternalRedisClient.getPool.returnResource(jedis)
})
})
})

ssc.start()
ssc.awaitTermination()

}
}

上面代码实现,得益于Scala语言的特性,可以在代码中任何位置进行class或object的定义,我们将用来管理Redis连接的代码放在了 特定操作的内部,就避免了瞬态(Transient)对象跨节点序列化的问题。这样做还要求我们能够了解Spark内部是如何操作RDD数据集的,更多可 以参考RDD或Spark相关文档。
在集群上,以Standalone模式运行,执行如下命令:
1    cd /usr/local/spark
2    ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077

可以查看集群中各个Worker节点执行计算任务的状态,也可以非常方便地通过Web页面查看。
下面,看一下我们存储到Redis中的计算结果,如下所示:
127.0.0.1:6379[1]> HGETALL app::users::click
1) "4A4D769EB9679C054DE81B973ED5D768"
2) "7037"
3) "8dfeb5aaafc027d89349ac9a20b3930f"
4) "6992"
5) "011BBF43B89BFBF266C865DF0397AA71"
6) "7021"
7) "97edfc08311c70143401745a03a50706"
8) "6874"
9) "d7f141563005d1b5d0d3dd30138f3f62"
10) "7057"
11) "a95f22eabc4fd4b580c011a3161a9d9d"
12) "7092"
13) "6b67c8c700427dee7552f81f3228c927"
14) "7266"
15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"
16) "7188"
17) "c8ee90aade1671a21336c721512b817a"
18) "6950"
19) "068b746ed4620d25e26055a9f804385f"

有关更多关于Spark Streaming的详细内容,可以参考官方文档。

附录

这里,附上前面开发的应用所对应的依赖,以及打包Spark Streaming应用程序的Maven配置,以供参考。如果使用maven-shade-plugin插件,配置有问题的话,打包后在Spark集群上 提交Application时候可能会报错Invalid signature file digest for Manifest main attributes。参考的Maven配置,如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.shirdrn.spark</groupId>
<artifactId>spark</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>${basedir}/src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

来源:http://shiyanjun.cn/archives/1097.html

RocketMQ安装与使用

一、服务端安装部署

我是在虚拟机中的CentOS6.5中进行部署。
2.tar -xvf alibaba-rocketmq-3.0.7.tar.gz 解压到适当的目录如/opt/目录
3.启动RocketMQ:进入rocketmq/bin 目录 执行
nohup sh mqnamesrv &

4.启动Broker,设置对应的NameServer

 nohup sh mqbroker -n "127.0.0.1:9876" &

二、编写客户端

可以查看sameple中的quickstart源码 1.Consumer 消息消费者

/**
* Consumer,订阅消息
*/
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");

consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("QuickStartConsumer");
consumer.subscribe("QuickStart", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}
2.Producer消息生产者

/**
* Producer,发送消息
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("QuickStartProducer");
producer.start();

for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("QuickStart",// topic
"TagA",// tag
("Hello RocketMQ ,QuickStart" + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

producer.shutdown();
}
}

 

3.首先运行Consumer程序,一直在运行状态接收服务器端推送过来的消息

23:18:07.587 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
23:18:07.591 [main] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows
23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 7
23:18:07.592 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.ByteBuffer.cleaner: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
23:18:07.593 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable
23:18:07.594 [main] DEBUG i.n.util.internal.PlatformDependent - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes. Please check the configuration for better performance.
23:18:07.595 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
23:18:07.611 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
23:18:08.355 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0x8c0d4793e5820c31
23:18:08.446 [NettyClientWorkerThread_1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.noResourceLeakDetection: false
Consumer Started.

4.再次运行Producer程序,生成消息并发送到Broker,Producer的日志冲没了,但是可以看到Broker推送到Consumer的一条消息
1

ConsumeMessageThread-QuickStartConsumer-3 Receive New Messages: [MessageExt [queueId=0, storeSize=150, queueOffset=244, sysFlag=0, bornTimestamp=1400772029972, bornHost=/10.162.0.7:54234, storeTimestamp=1400772016017, storeHost=/127.0.0.1:10911, msgId=0A0A0A5900002A9F0000000000063257, commitLogOffset=406103, bodyCRC=112549959, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=QuickStart, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=245, MIN_OFFSET=0}, body=29]]]

三、Consumer最佳实践

1.消费过程要做到幂等(即消费端去重)

RocketMQ无法做到消息重复,所以如果业务对消息重复非常敏感,务必要在业务层面去重,有以下一些方式:

(1).将消息的唯一键,可以是MsgId,也可以是消息内容中的唯一标识字段,例如订单ID,消费之前判断是否在DB或Tair(全局KV存储) 中存在,如果不存在则插入,并消费,否则跳过。(实践过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过) msgid一定是全局唯一的标识符,但是可能会存在同样的消息有两个不同的msgid的情况(有多种原因),这种情况可能会使业务上重复,建议最好使用消 息体中的唯一标识字段去重

(2).使业务层面的状态机去重

2.批量方式消费

如果业务流程支持批量方式消费,则可以很大程度上的提高吞吐量,可以通过设置Consumer的consumerMessageBatchMaxSize参数,默认是1,即一次消费一条参数

3.跳过非重要的消息

发生消息堆积时,如果消费速度一直跟不上发送速度,可以选择丢弃不重要的消息

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

long offset=msgs.get(0).getQueueOffset();

String maxOffset=msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
long diff=Long.parseLong(maxOffset)-offset;
if(diff>100000){
//处理消息堆积情况
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

如以上代码所示,当某个队列的消息数堆积到 100000 条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度

4.优化没条消息消费过程

举例如下,某条消息的消费过程如下

1. 根据消息从 DB 查询数据 1

2. 根据消息从 DB 查询数据2

3. 复杂的业务计算

4. 向 DB 插入数据3

5. 向 DB 插入数据 4

这条消息的消费过程与 DB 交互了 4 次,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,也就是说总体性能提高了 40%。

对于 Mysql 等 DB,如果部署在磁盘,那么与 DB 进行交互,如果数据没有命中 cache,每次交互的 RT 会直线上升, 如果采用 SSD,则 RT 上升趋势要明显好于磁盘。

个别应用可能会遇到这种情况:在线下压测消费过程中,db 表现非常好,每次 RT 都很短,但是上线运行一段时间,RT 就会变长,消费吞吐量直线下降

主要原因是线下压测时间过短,线上运行一段时间后,cache 命中率下降,那么 RT 就会增加。建议在线下压测时,要测试足够长时间,尽可能模拟线上环境,压测过程中,数据的分布也很重要,数据不同,可能 cache 的命中率也会完全不同

四、Producer最佳实践

1.发送消息注意事项

(1) 一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。

(2)每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以 通过 topic,key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

(3)消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段

(4)send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义

  • SEND_OK:消息发送成功
  • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  • ​FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  • SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会 丢失。对于精确发送顺序消息的应用,由于顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult 中的 status 字段不等 于 SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样

(5)对于消息不可丢失应用,务必要有消息重发机制

2.消息发送失败处理

Producer 的 send 方法本身支持内部重试,重试逻辑如下:

(1) 至多重试 3 次

(2) 如果发送失败,则轮转到下一个 Broker

(3) 这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s所以,如果本身向 broker 发送消息产生超时异常,就不会再做重试

如果调用 send 同步方法发送失败,则尝试将消息存储到 db,由后台线程定时重试,保证消息一定到达 Broker。

上述 db 重试方式为什么没有集成到 MQ 客户端内部做,而是要求应用自己去完成,基于以下几点考虑:

(1)MQ 的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是 cpu、内存、网络

(2)如果 MQ 客户端内部集成一个 KV 存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘, 又由于应用关闭过程不受 MQ 运维人员控制,可能经常会发生 kill -9 这样暴力方式关闭,造成数据没有及时落盘而丢失

(3)Producer 所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。 综上,建议重试过程交由应用来控制。

3.选择 oneway 形式发送

一个 RPC 调用,通常是这样一个过程

(1)客户端发送请求到服务器

(2)服务器处理该请求

(3)服务器向客户端返回应答

所以一个 RPC 的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采 用 oneway 形式调用,oneway 形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个 os 系统调用的开销,即将数据写入客户 端的 socket 缓冲区,此过程耗时通常在微秒级

 

RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据
/**
* PullConsumer,订阅消息
*/
public class PullConsumer {
//Java缓存
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ:while(true){
try {

PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}

consumer.shutdown();
}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}

private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null){
System.out.println(offset);
return offset;
}
return 0;
}

/**
* PullConsumer,订阅消息
*/
public class PullConsumer {
//Java缓存
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ:while(true){
try {

PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}

consumer.shutdown();
}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}

private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null){
System.out.println(offset);
return offset;
}
return 0;
}

刚开始的没有细看PullResult对象,以为拉取到的结果没有MessageExt对象还跑到群里面问别人,犯2了

特别要注意  静态变量offsetTable的作用,拉取的是按照从offset(理解为下标)位置开始拉取,拉取N条,offsetTable记录下次拉取的offset位置

 

来源:http://www.changeself.net/archives/rocketmq%E5%85%A5%E9%97%A8%EF%BC%882%EF%BC%89%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5.html