RocketMq知识点理解

  categories:资料  tags:, ,   author:

RocketMq关键点理解

各个角色间的关系:

RocketMq中每个Broker(master和slave)与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
Name Server之间不会有任何信息交互,各自独立。
producer和consumer随机从一个name server即可获得全部topic的路由信息。
producer根据得到的路由信息,同master建立长连接。
consumer根据路由信息,同master个salve都建立长连接。然后根据设置的订阅规则,选择从master或者slave订阅消息。

Broker:

分为master和slave两个角色。master提供读写读物,salve只提供读服务。
为了保证可用性,需要部署多套broker,每套broker至少有1个master和1个以上的salve。
在同一套broker中,master和salve都是同样的brokerName,master的brokerId是0,salve的brokerId必须是非0的。

同步刷盘和异步刷盘。

同步刷盘是说,broker在收到每个消息后,都是先要保存到硬盘上,然后再给producer确认。异步刷盘就是先回复确认,然后批量保存到硬盘上。异步刷盘有更好的性能,当然也有更大的丢失消息的风险。

同步复制和异步复制。

是说在master和salve之间复制消息的方式。同步是说在salve也存储了消息后再答复producer。
异步复制是先答复producer,再去向salve复制。
通过同步复制技术可以完全避免单点,同步复制势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。RocketMQ从3.0版本开始支持同步双写。

MQPullConsumer和MQPushConsumer的区别

consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。
区别是:
push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。
pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

RocketMQ使用长轮询Pull方式,可保证消息非常实时,消息实时性不低于Push的理解

数据交互有两种模式:Push(推模式)、Pull(拉模式)。
推模式指的是客户端与服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推送到客户端。其优点是及时,一旦有数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。
拉模式指的是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。

长轮询:
轮询是说,每隔一定时间,客户端想服务端发起一次请求,服务端有数据就返回数据,没有数据就返回空,然后关闭请求。
长轮询,不同之处是,服务端如果此时没有数据,保持连接。等到有数据返回(相当于一种push),或者超时返回。
所以长轮询Pull的好处就是可以减少无效请求,保证消息的实时性,又不会造成客户端积压。

其他

  1. 对于一个消息中间件来说,持久化部分的性能直接决定了整个消息中间件的性能。RocketMQ充分利用Linux文件系统内存cache来提高性能
  2. RocketMq Broker的buffer不会满。原因是RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。这是RocketMq和其他消息中间件的重要区别。对RocketMQ来说的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据。
阅读全文

Kafka与RabbitMQ的对比

  categories:资料  tags:,   author:

一、前言
消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。

目前开源的消息中间件可谓是琳琅满目,能让大家耳熟能详的就有很多,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ等。不管选择其中的哪一款,都会有用的不趁手的地方,毕竟不是为你量身定制的。有些大厂在长期的使用过程中积累了一定的经验,其消息队列的使用场景也相对稳定固化,或者目前市面上的消息中间件无法满足自身需求,并且也具备足够的精力和人力而选择自研来为自己量身打造一款消息中间件。但是绝大多数公司还是不会选择重复造轮子,那么选择一款合适自己的消息中间件显得尤为重要。就算是前者,那么在自研出稳定且可靠的相关产品之前还是会经历这样一个选型过程。

在整体架构中引入消息中间件,势必要考虑很多因素,比如成本及收益问题,怎么样才能达到最优的性价比?虽然消息中间件种类繁多,但是各自都有各自的侧重点,选择合适自己、扬长避短无疑是最好的方式。如果你对此感到无所适从,本文或许可以参考一二。


二、各类消息队列简述

ActiveMQ是Apache出品的、采用Java语言编写的完全基于JMS1.1规范的面向消息的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。不过由于历史原因包袱太重,目前市场份额没有后面三种消息中间件多,其最新架构被命名为Apollo,号称下一代ActiveMQ,有兴趣的同学可行了解。

RabbitMQ是采用Erlang语言实现的AMQP协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ发展到今天,被越来越多的人认可,这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。

Kafka起初是由LinkedIn公司采用Scala语言开发的一个分布式、多分区、多副本且基于zookeeper协调的分布式消息系统,现已捐献给Apache基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark、Flink等都支持与Kafka集成。

RocketMQ是阿里开源的消息中间件,目前已经捐献个Apache基金会,它是由Java语言开发的,具备高吞吐量、高可用性、适合大规模分布式系统应用等特点,经历过双11的洗礼,实力不容小觑。

ZeroMQ号称史上最快的消息队列,基于C语言开发。ZeroMQ是一个消息处理队列库,可在多线程、多内核和主机之间弹性伸缩,虽然大多数时候我们习惯将其归入消息队列家族之中,但是其和前面的几款有着本质的区别,ZeroMQ本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的Socket API上加上一层封装而已。

目前市面上的消息中间件还有很多,比如腾讯系的PhxQueue、CMQ、CKafka,又比如基于Go语言的NSQ,有时人们也把类似Redis的产品也看做消息中间件的一种,当然它们都很优秀,但是本文篇幅限制无法穷极所有,下面会针对性的挑选RabbitMQ和Kafka两款典型的消息中间件来做分析,力求站在一个公平公正的立场来阐述消息中间件选型中的各个要点。


三、选型要点概述

衡量一款消息中间件是否符合需求需要从多个维度进行考察,首要的就是功能维度,这个直接决定了你能否最大程度上的实现开箱即用,进而缩短项目周期、降低成本等。如果一款消息中间件的功能达不到想要的功能,那么就需要进行二次开发,这样会增加项目的技术难度、复杂度以及增大项目周期等。

1. 功能维度

功能维度又可以划分个多个子维度,大致可以分为以下这些:

  • 优先级队列

优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

  • 延迟队列

当你在网上购物的时候是否会遇到这样的提示:“三十分钟之内未付款,订单自动取消”?这个是延迟队列的一种典型应用场景。延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种:基于消息的延迟和基于队列的延迟。基于消息的延迟是指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,当然这也会对性能造成极大的影响。实际应用中大多采用基于队列的延迟,设置不同延迟级别的队列,比如5s、10s、30s、1min、5mins、10mins等,每个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,通过一定的扫描策略(比如定时)即可投递超时的消息。

  • 死信队列

由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

  • 重试队列

重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker中。与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列Q1,Q1的重新投递延迟为5s,在5s过后重新投递该消息;如果消息再次消费失败则入重试队列Q2,Q2的重新投递延迟为10s,在10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。重试队列与延迟队列有相同的地方,都是需要设置延迟级别,它们彼此的区别是:延迟队列动作由内部触发,重试队列动作由外部消费端触发;延迟队列作用一次,而重试队列的作用范围会向后传递。

  • 消费模式

消费模式分为推(push)模式和拉(pull)模式。推模式是指由Broker主动推送消息至消费端,实时性较好,不过需要一定的流制机制来确保服务端推送过来的消息不会压垮消费端。而拉模式是指消费端主动向Broker端请求拉取(一般是定时或者定量)消息,实时性较推模式差,但是可以根据自身的处理能力而控制拉取的消息量。

  • 广播消费

消息一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。对于点对点的模式而言,消息被消费以后,队列中不会再存储,所以消息消费者不可能消费到已经被消费的消息。虽然队列可以支持多个消费者,但是一条消息只会被一个消费者消费。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。RabbitMQ是一种典型的点对点模式,而Kafka是一种典型的发布订阅模式。但是RabbitMQ中可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果,Kafka中也能以点对点的形式消费,你完全可以把其消费组(consumer group)的概念看成是队列的概念。不过对比来说,Kafka中因为有了消息回溯功能的存在,对于广播消费的力度支持比RabbitMQ的要强。

  • 消息回溯

一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。

  • 消息堆积+持久化

流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。RabbitMQ是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。Kafka是一种典型的磁盘式堆积,所有的消息都存储在磁盘中。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。援引纽约时报的案例,其直接将Kafka用作存储系统。… 阅读全文

RabbitMQ之优先级消息队列

  categories:资料  tags:  author:

RabbitMQ优先级队列注意点:

1、只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效

2、RabbitMQ3.5以后才支持优先级队列

代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下:

消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器)

<!-- ========================================RabbitMQ========================================= -->
    <!-- 连接工厂 -->
    <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
    <!-- 监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
        <rabbit:listener queues="red" method="onMessage" ref="redQueueListener" />
        <rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" />
        
阅读全文

RocketMQ初探一:NameServer的作用

  categories:资料  tags:, ,   author:

第一次真正接触Java消息服务是在2013年底,当时是给中国移动做统一支付平台,当时用的就是著名的Apache ActiveMQ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《Java Message Service》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现JMS规范的MOM有ActiveMQ/Apollo和HornetQ,都是采用Java实现。JMS中说明了Java消息服务的两种消息传送模型,即P2P(点对点)和Pub/Sub(发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口API,是否实现了该API,标志着MOM是否支持JMS规范,JMS规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现JMS规范的MOM,性能总不会太高,而且JMS规范中没有涉及消息服务的分布式特性,导致大多数实现JMS规范的MOM分布式部署功能比较弱,只适合集群部署。

 

说到高性能消息中间件,第一个想到的肯定是LinkedIn开源的Kafka,虽然最初Kafka是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。RocketMQ是MetaQ的3.0版本,而MetaQ最初的设计又参考了Kafka。最初的MetaQ 1.x版本由阿里的原作者庄晓丹开发,后面的MetaQ 2.x版本才进行了开源,这里需要注意一点的事,MetaQ 1.x和MetaQ 2.x是依赖ZooKeeper的,但RocketMQ(即MetaQ 3.x)却去掉了ZooKeeper依赖,转而采用自己的NameServer。

 

ZooKeeper是著名的分布式协作框架,提供了Master选举、分布式锁、数据的发布和订阅等诸多功能,为什么RocketMQ没有选择ZooKeeper,而是自己开发了NameServer,我们来具体看看NameServer在RocketMQ集群中的作用就明了了。

 

RocketMQ的Broker有三种集群部署方式:1.单台Master部署;2.多台Master部署;3.多Master多Slave部署;采用第3种部署方式时,Master和Slave可以采用同步复制和异步复制两种方式。下图是第3种部署方式的简单图:

 


图虽然是网上找的,但也足以说明问题,当采用多Master方式时,Master与Master之间是不需要知道彼此的,这样的设计直接降低了Broker实现的复查性,你可以试想,如果Master与Master之间需要知道彼此的存在,这会需要在Master之中维护一个网络的Master列表,而且必然设计到Master发现和活跃Master数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是Master只做好自己的事情(比如和Slave进行数据同步)即可,这样,在分布式环境中,某台Master宕机或上线,不会对其他Master造成任何影响。

 

那么怎么才能知道网络中有多少台Master和Slave呢?你会很自然想到用ZooKeeper,每个活跃的Master或Slave都去约定的ZooKeeper节点下注册一个状态节点,但RocketMQ没有使用ZooKeeper,所以这件事就交给了NameServer来做了(看上图)。

 

结论一:NameServer用来保存活跃的broker列表,包括Master和Slave。

当然,这个结论百度一查就知道,我们移步到rocketmq-namesrv模块中最重要的一个类:RouteInfoManager,它的主要属性如下:

 

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* … 阅读全文

Kafka中zookeeper的作用

  categories:资料  tags:  author:

ZooKeeper与Kafka

有多个服务器的分布式系统,每台服务器都负责保存数据,在数据上执行操作。这样的潜在例子包括分布式搜索引擎、分布式构建系统或者已知的系统如Apache Hadoop。所有这些分布式系统的一个常见问题是,你如何在任一时间点确定哪些服务器活着并且在工作中。最重要的是,当面对这些分布式计算的难题,例如网络失败、带宽限制、可变延迟连接、安全问题以及任何网络环境,甚至跨多个数据中心时可能发生的错误时,你如何可靠地做这些事。这些正是Apache ZooKeeper所关注的问题,它是一个快速、高可用、容错、分布式的协调服务。你可以使用ZooKeeper构建可靠的、分布式的数据结构,用于群组成员、领导人选举、协同工作流和配置服务,以及广义的分布式数据结构如锁、队列、屏障(Barrier)和锁存器(Latch)。许多知名且成功的项目依赖于ZooKeeper,其中包括HBase、Hadoop 2.0、Solr Cloud、Neo4J、Apache Blur(Incubating)和Accumulo。

ZooKeeper是一个分布式的、分层级的文件系统,能促进客户端间的松耦合,并提供最终一致的,类似于传统文件系统中文件和目录的Znode视图。它提供了基本的操作,例如创建、删除和检查Znode是否存在。它提供了事件驱动模型,客户端能观察特定Znode的变化,例如现有Znode增加了一个新的子节点。ZooKeeper运行多个ZooKeeper服务器,称为Ensemble,以获得高可用性。每个服务器都持有分布式文件系统的内存复本,为客户端的读取请求提供服务。

Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的,否则Zk就疯了。kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition的leader建立socket连接并发送消息。也就是说每个Topic的partition是由Lead角色的Broker端使用zookeeper来注册broker信息,以及监测partition leader存活性.Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.

leader 选举 和 follower 信息同步

如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。
Producer 直接连接 Broker。

Producer 把数据上传到 Broker,Producer可以指定数据有几个分区、几个备份。上面的图中,数据有两个分区 0、1,每个分区都有自己的副本:0’、 1’。

黄色的分区为 leader,白色的为 follower。

leader 处理 partition 的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。 如下图所示,红色的为 leader,绿色的为

阅读全文

scribe、chukwa、kafka、flume日志系统对比

  categories:资料  tags:,   author:

1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;(3) 具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。

本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开源的日志系统,包括facebook的scribe,apache的chukwa,linkedin的kafka和cloudera的flume等。

2. FaceBook的Scribe

Scribe是facebook开源的日志收集系统,在facebook内部已经得到大量的应用。它能够从各种日志源上收集日志,存储到一个中央存储系统 (可以是NFS,分布式文件系统等)上,以便于进行集中统计分析处理。它为日志的“分布式收集,统一处理”提供了一个可扩展的,高容错的方案。

它最重要的特点是容错性好。当后端的存储系统crash时,scribe会将数据写到本地磁盘上,当存储系统恢复正常后,scribe将日志重新加载到存储系统中。

chukwa

架构

scribe的架构比较简单,主要包括三部分,分别为scribe agent, scribe和存储系统。

(1) scribe agent

scribe agent实际上是一个thrift client。 向scribe发送数据的唯一方法是使用thrift client, scribe内部定义了一个thrift接口,用户使用该接口将数据发送给server。

(2) scribe

scribe接收到thrift client发送过来的数据,根据配置文件,将不同topic的数据发送给不同的对象。scribe提供了各种各样的store,如 file, HDFS等,scribe可将数据加载到这些store中。

(3) 存储系统

存储系统实际上就是scribe中的store,当前scribe支持非常多的store,包括file(文件),buffer(双层存储,一个主储存, 一个副存储),network(另一个scribe服务器),bucket(包含多个 store,通过hash的将数据存到不同store中),null(忽略数据),thriftfile(写到一个Thrift TFileTransport文件中)和multi(把数据同时存放到不同store中)。

3. Apache的Chukwa

chukwa是一个非常新的开源项目,由于其属于hadoop系列产品,因而使用了很多hadoop的组件(用HDFS存储,用mapreduce处理数据),它提供了很多模块以支持hadoop集群日志分析。… 阅读全文

kafka原理解析

  categories:资料  tags:  author:

介绍

分布式消息系统kafka的提供了一个生产者、缓冲区、消费者的模型

  • broker:中间的kafka cluster,存储消息,是由多个server组成的集群
  • topic:kafka给消息提供的分类方式。broker用来存储不同topic的消息数据
  • producer:往broker中某个topic里面生产数据
  • consumer:往broker中某个topic获取数据

设计思想

topic与消息

kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。

这样,消息就以一个个id的方式,组织起来。

  • producer选择一个topic,生产消息,消息会通过分配策略append到某个partition末尾
  • consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费

这个id,在kafka中被称为offset

这种组织和处理策略提供了如下好处:

  • 消费者可以根据需求,灵活指定offset消费
  • 保证了消息不变性,为并发消费提供了线程安全的保证。每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题
  • 消息访问的并行高效性。每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力
  • 增加消息系统的可伸缩性。每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配
  • 保证消息可靠性。消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失
  • 灵活的持久化策略。可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间

备份

消息以partition为单位分配到多个server,并以partition为单位进行备份。备份策略为:1个leader和N个followers,leader接受读写请求,followers被动复制leader。leader和followers会在集群中打散,保证partition高可用

producer

producer生产消息需要如下参数:

  • topic:往哪个topic生产消息
  • partition:往哪个partition生产消息
  • key:根据该key将消息分区到不同partition
  • message:消息

根据kafka源码,可以根据不同参数灵活调整生产、分区策略

if topic is None
    throw 
阅读全文

ZeroMQ简介研究与应用分析

  categories:资料  tags:  author:

一、什么是ZeroMQ

1、ZMQ是一个简单好用的传输层,像框架一样的socket library.

2、ZMQ是一个消息处理队列。

3、与RabbitMQ相比,ZMQ并不像一个传统意义上的消息服务器,更像是一个底层的网络通讯库,在socket API之上做了一层封装,将网络通讯,进程通讯和线程通讯抽象成统一的API接口

以下是ZMQ的官方简介

ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process,

阅读全文

分布式开放消息系统(RocketMQ)的原理与实践

  categories:资料  tags:,   author:

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  1. 消息的顺序问题
  2. 消息的重复问题

RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?

关键特性以及其实现原理

一、顺序消息

消息有序指的是可以按照消息的发送顺序来消费。例如:一笔订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照顺序依次消费才有意义。与此同时多笔订单之间又是可以并行消费的。首先来看如下示例:

假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

你可能会采用这种方式保证消息顺序

假定M1发送到S1,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端被消费后,通知S2,然后S2再将M2发送到消费端。

这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到MQ集群,也不能保证M1被先消费。换个角度看,如果M2先于M1达到MQ集群,甚至M2被消费后,M1才达到消费端,这时消息也就乱序了,说明以上模型是不能保证消息的顺序的。如何才能在MQ集群保证消息的顺序?一种简单的方式就是将M1、M2发送到同一个Server上:

保证消息顺序,你改进后的方法

这样可以保证M1先于M2到达MQServer(生产者等待M1发送成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

这个模型也仅仅是理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:

网络延迟问题

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就仍将被先消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费的情况。

那如何解决这个问题?将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2。

聪明的你可能已经想到另外的问题:如果M1被发送到消费端后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。

保证消息顺序的正确姿势

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费M1且已经发送响应消息,只是MQ Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就引入了我们要说的第二个问题,消息重复问题,这个后文会详细讲解。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简单处理。总结起来,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

这样的设计虽然简单易行,但也会存在一些很严重的问题,比如:

  1. 并行度就会成为消息系统的瓶颈(吞吐量不够)
  2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

有些问题,看起来很重要,但实际上我们可以通过合理的设计阅读全文

ELK+Kafka集群日志分析系统

  categories:mq, 资料  tags:,   author:

一、 系统介绍 2

二、 版本说明 3

三、 服务部署 3

1) JDK部署 3

2) Elasticsearch集群部署及优化 3

3) Elasticsearch健康插件安装 13

4) Shield之elasticsearch安全插件 15

5)Zookeeper集群搭建 15

6)Kafka集群搭建 17

7)测试Kafka和Zookeeper集群连通性 19

8) Logstash部署 20

9) Kibana部署 20

四、 系统使用示例 22

1) Logstash 作为kafka生产者示例 22… 阅读全文




快乐成长 每天进步一点点      京ICP备18032580号-1