RocketMq知识点理解

  categories:资料  tags:, ,   author:

RocketMq关键点理解

各个角色间的关系:

RocketMq中每个Broker(master和slave)与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
Name Server之间不会有任何信息交互,各自独立。
producer和consumer随机从一个name server即可获得全部topic的路由信息。
producer根据得到的路由信息,同master建立长连接。
consumer根据路由信息,同master个salve都建立长连接。然后根据设置的订阅规则,选择从master或者slave订阅消息。

Broker:

分为master和slave两个角色。master提供读写读物,salve只提供读服务。
为了保证可用性,需要部署多套broker,每套broker至少有1个master和1个以上的salve。
在同一套broker中,master和salve都是同样的brokerName,master的brokerId是0,salve的brokerId必须是非0的。

同步刷盘和异步刷盘。

同步刷盘是说,broker在收到每个消息后,都是先要保存到硬盘上,然后再给producer确认。异步刷盘就是先回复确认,然后批量保存到硬盘上。异步刷盘有更好的性能,当然也有更大的丢失消息的风险。

同步复制和异步复制。

是说在master和salve之间复制消息的方式。同步是说在salve也存储了消息后再答复producer。
异步复制是先答复producer,再去向salve复制。
通过同步复制技术可以完全避免单点,同步复制势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。RocketMQ从3.0版本开始支持同步双写。

MQPullConsumer和MQPushConsumer的区别

consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。
区别是:
push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。
pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

RocketMQ使用长轮询Pull方式,可保证消息非常实时,消息实时性不低于Push的理解

数据交互有两种模式:Push(推模式)、Pull(拉模式)。
推模式指的是客户端与服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推送到客户端。其优点是及时,一旦有数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。
拉模式指的是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。

长轮询:
轮询是说,每隔一定时间,客户端想服务端发起一次请求,服务端有数据就返回数据,没有数据就返回空,然后关闭请求。
长轮询,不同之处是,服务端如果此时没有数据,保持连接。等到有数据返回(相当于一种push),或者超时返回。
所以长轮询Pull的好处就是可以减少无效请求,保证消息的实时性,又不会造成客户端积压。

其他

  1. 对于一个消息中间件来说,持久化部分的性能直接决定了整个消息中间件的性能。RocketMQ充分利用Linux文件系统内存cache来提高性能
  2. RocketMq Broker的buffer不会满。原因是RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。这是RocketMq和其他消息中间件的重要区别。对RocketMQ来说的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据。
阅读全文

RocketMQ初探一:NameServer的作用

  categories:资料  tags:, ,   author:

第一次真正接触Java消息服务是在2013年底,当时是给中国移动做统一支付平台,当时用的就是著名的Apache ActiveMQ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《Java Message Service》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现JMS规范的MOM有ActiveMQ/Apollo和HornetQ,都是采用Java实现。JMS中说明了Java消息服务的两种消息传送模型,即P2P(点对点)和Pub/Sub(发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口API,是否实现了该API,标志着MOM是否支持JMS规范,JMS规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现JMS规范的MOM,性能总不会太高,而且JMS规范中没有涉及消息服务的分布式特性,导致大多数实现JMS规范的MOM分布式部署功能比较弱,只适合集群部署。

 

说到高性能消息中间件,第一个想到的肯定是LinkedIn开源的Kafka,虽然最初Kafka是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。RocketMQ是MetaQ的3.0版本,而MetaQ最初的设计又参考了Kafka。最初的MetaQ 1.x版本由阿里的原作者庄晓丹开发,后面的MetaQ 2.x版本才进行了开源,这里需要注意一点的事,MetaQ 1.x和MetaQ 2.x是依赖ZooKeeper的,但RocketMQ(即MetaQ 3.x)却去掉了ZooKeeper依赖,转而采用自己的NameServer。

 

ZooKeeper是著名的分布式协作框架,提供了Master选举、分布式锁、数据的发布和订阅等诸多功能,为什么RocketMQ没有选择ZooKeeper,而是自己开发了NameServer,我们来具体看看NameServer在RocketMQ集群中的作用就明了了。

 

RocketMQ的Broker有三种集群部署方式:1.单台Master部署;2.多台Master部署;3.多Master多Slave部署;采用第3种部署方式时,Master和Slave可以采用同步复制和异步复制两种方式。下图是第3种部署方式的简单图:

 


图虽然是网上找的,但也足以说明问题,当采用多Master方式时,Master与Master之间是不需要知道彼此的,这样的设计直接降低了Broker实现的复查性,你可以试想,如果Master与Master之间需要知道彼此的存在,这会需要在Master之中维护一个网络的Master列表,而且必然设计到Master发现和活跃Master数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是Master只做好自己的事情(比如和Slave进行数据同步)即可,这样,在分布式环境中,某台Master宕机或上线,不会对其他Master造成任何影响。

 

那么怎么才能知道网络中有多少台Master和Slave呢?你会很自然想到用ZooKeeper,每个活跃的Master或Slave都去约定的ZooKeeper节点下注册一个状态节点,但RocketMQ没有使用ZooKeeper,所以这件事就交给了NameServer来做了(看上图)。

 

结论一:NameServer用来保存活跃的broker列表,包括Master和Slave。

当然,这个结论百度一查就知道,我们移步到rocketmq-namesrv模块中最重要的一个类:RouteInfoManager,它的主要属性如下:

 

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* … 阅读全文

artemis安装

  categories:mq, 资料  tags:,   author:

一.Apache Artemis介绍

Apache Artemis是apache的一个新的消息系统, 这个消息系统是来源于 redhat的 “异步消息系统 HornetQ

HornetQ的相关资料有如下:

http://wenku.baidu.com/view/2f19b1557fd5360cba1adbd9.html?from=search

1. 关于Apache Artemis

http://activemq.apache.org/artemis/  项目的注意

http://activemq.apache.org/artemis/download.html  下载页面

另外 可以参考 HornetQ的相关资料, 目前同 HornetQ还有很大的相似性

选择他的Artemis的考虑是, 他是中等成熟的 消息系统, 功能够用, 代码少, 阅读方便, 便于理解, 这样有了问题才好 进行相关的处理工作。

而 activemq 代码太庞大了, 功能太多, 想彻底弄清楚要费非常大力气

而apache Apollo  

阅读全文

Kafka+Spark Streaming+Redis实时计算整合实践

  categories:mq, 资料  tags:, ,   author:

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这 里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。
我们的应用场景是分析用户使用手机App的行为,描述如下所示:

  • 手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
  • 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持
  • 经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析

Spark Streaming介绍

Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就 构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所 示(来自官网):
spark-cluster-overview-streaming-dstream
我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过 转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且 Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才 会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久 化,Action操作包括reduceByKey、count、foreach、collect等。关于Transformations和Actions 更详细内容,可以查看官网文档。
同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作 用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间 间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行 … 阅读全文

RocketMQ安装与使用

  categories:mq, 资料  tags:, ,   author:

一、服务端安装部署

我是在虚拟机中的CentOS6.5中进行部署。
1.下载程序
2.tar -xvf alibaba-rocketmq-3.0.7.tar.gz 解压到适当的目录如/opt/目录
3.启动RocketMQ:进入rocketmq/bin 目录 执行
nohup sh mqnamesrv &

4.启动Broker,设置对应的NameServer

 nohup sh mqbroker -n “127.0.0.1:9876″ &

二、编写客户端

可以查看sameple中的quickstart源码 1.Consumer 消息消费者

/**
* Consumer,订阅消息
*/
public class Consumer {

public static void main(String[] args)

阅读全文

RocketMQ入门

  categories:mq  tags:, ,   author:

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:能够保证严格的消息顺序

一.RocketMQ网络部署特点

来源:http://www.changeself.net/archives/rocketmq%E5%85%A5%E9%97%A8%EF%BC%881%EF%BC%89.html

    (1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
阅读全文

kafka java示例

  categories:mq, 资料  tags:, ,   author:

研究kafka中, 集群批准好了, 用kafka的 控制台例子收发消息都成功了, 想用java进行一下测试,但是怎么弄都没好用, 在网络搜索了例子测试一下。

kafka官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。
http://kafka.apache.org/documentation.html#highlevelconsumerapi 等

Producer Code

import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;

public class ProducerSample {

public static void main(String[] args) {
ProducerSample ps = new ProducerSample();

Properties props = new … 阅读全文

漫游kafka实战篇之搭建Kafka开发环境

  categories:mq, 资料  tags:, ,   author:
上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。下面我们来搭建kafka的开发环境。
添加依赖

搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。
创建好maven项目后,在pom.xml中添加以下依赖:
<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.10</artifactId >
<version> 0.8.0</ version>
</dependency>
添加依赖后你会发现有两个jar包的依赖找不到。没关系我都帮你想好了,点击这里下载这两个jar包,解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,比如我的本地仓库是d:\mvn,完成后我的目录结构是这样的:
目录结构
配置程序

首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:package com.sohu.kafkademon;public interface KafkaProperties
{
final static String zkConnect = “10.22.10.139:2181″;
final static String groupId = “group1″;
final static String topic = “topic1″;
阅读全文

MQTT简介

  categories:mq  tags:,   author:

1. MQTT简介

MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
    “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次”,确保消息到达,但消息重复可能会发生。
    “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品 和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry … 阅读全文

史上最快的消息队列ZeroMQ简介

  categories:mq, 资料  tags:,   author:

—– ZMQ的学习和研究

一、ZeroMQ的背景介绍

引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是 “成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接 字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”

近几年有关”Message Queue”的项目层出不穷,知名的就有十几种,这主要是因为后摩尔定律时代,分布式处理逐渐成为主流,业界需要一套标准来解决分布式计算环境中节点之间 的消息通信。几年的竞争下来,Apache基金会旗下的符合AMQP/1.0标准的RabbitMQ已经得到了广泛的认可,成为领先的MQ项目。

与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。

二、ZMQ是什么?

阅读了ZMQ的Guide文档后,我的理解 是,这是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏 蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

三、本文的目的

在集群对外提供服务的过程中,我们有很多的配置,需要根据需要随时更新,那么这个信息如果推动到各个节点?并且保证信息的一致性和可靠性?本文在介 绍ZMQ基本理论的基础上,试图使用ZMQ实现一个配置分发中心。从一个节点,将信息无误的分发到各个服务器节点上,并保证信息正确性和一致性。

四、ZMQ的三个基本模型

ZMQ提供了三个基本的通信模型,分别是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”,我们从这三种模式一窥ZMQ的究竟

ZMQ的hello world!

由Client发起请求,并等待Server回应请求。请求端发送一个简单的hello,服务端则回应一个world。请求端和服务端都可以是 1:N 的模型。通常把 1 认为是 Server ,N 认为是Client … 阅读全文



快乐成长 每天进步一点点      京ICP备18032580号-1