标签归档:kafka

Kafka与RabbitMQ的对比

一、前言
消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。

目前开源的消息中间件可谓是琳琅满目,能让大家耳熟能详的就有很多,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ等。不管选择其中的哪一款,都会有用的不趁手的地方,毕竟不是为你量身定制的。有些大厂在长期的使用过程中积累了一定的经验,其消息队列的使用场景也相对稳定固化,或者目前市面上的消息中间件无法满足自身需求,并且也具备足够的精力和人力而选择自研来为自己量身打造一款消息中间件。但是绝大多数公司还是不会选择重复造轮子,那么选择一款合适自己的消息中间件显得尤为重要。就算是前者,那么在自研出稳定且可靠的相关产品之前还是会经历这样一个选型过程。

在整体架构中引入消息中间件,势必要考虑很多因素,比如成本及收益问题,怎么样才能达到最优的性价比?虽然消息中间件种类繁多,但是各自都有各自的侧重点,选择合适自己、扬长避短无疑是最好的方式。如果你对此感到无所适从,本文或许可以参考一二。


二、各类消息队列简述

ActiveMQ是Apache出品的、采用Java语言编写的完全基于JMS1.1规范的面向消息的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。不过由于历史原因包袱太重,目前市场份额没有后面三种消息中间件多,其最新架构被命名为Apollo,号称下一代ActiveMQ,有兴趣的同学可行了解。

RabbitMQ是采用Erlang语言实现的AMQP协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ发展到今天,被越来越多的人认可,这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。

Kafka起初是由LinkedIn公司采用Scala语言开发的一个分布式、多分区、多副本且基于zookeeper协调的分布式消息系统,现已捐献给Apache基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark、Flink等都支持与Kafka集成。

RocketMQ是阿里开源的消息中间件,目前已经捐献个Apache基金会,它是由Java语言开发的,具备高吞吐量、高可用性、适合大规模分布式系统应用等特点,经历过双11的洗礼,实力不容小觑。

ZeroMQ号称史上最快的消息队列,基于C语言开发。ZeroMQ是一个消息处理队列库,可在多线程、多内核和主机之间弹性伸缩,虽然大多数时候我们习惯将其归入消息队列家族之中,但是其和前面的几款有着本质的区别,ZeroMQ本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的Socket API上加上一层封装而已。

目前市面上的消息中间件还有很多,比如腾讯系的PhxQueue、CMQ、CKafka,又比如基于Go语言的NSQ,有时人们也把类似Redis的产品也看做消息中间件的一种,当然它们都很优秀,但是本文篇幅限制无法穷极所有,下面会针对性的挑选RabbitMQ和Kafka两款典型的消息中间件来做分析,力求站在一个公平公正的立场来阐述消息中间件选型中的各个要点。


三、选型要点概述

衡量一款消息中间件是否符合需求需要从多个维度进行考察,首要的就是功能维度,这个直接决定了你能否最大程度上的实现开箱即用,进而缩短项目周期、降低成本等。如果一款消息中间件的功能达不到想要的功能,那么就需要进行二次开发,这样会增加项目的技术难度、复杂度以及增大项目周期等。

1. 功能维度

功能维度又可以划分个多个子维度,大致可以分为以下这些:

  • 优先级队列

优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

  • 延迟队列

当你在网上购物的时候是否会遇到这样的提示:“三十分钟之内未付款,订单自动取消”?这个是延迟队列的一种典型应用场景。延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种:基于消息的延迟和基于队列的延迟。基于消息的延迟是指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,当然这也会对性能造成极大的影响。实际应用中大多采用基于队列的延迟,设置不同延迟级别的队列,比如5s、10s、30s、1min、5mins、10mins等,每个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,通过一定的扫描策略(比如定时)即可投递超时的消息。

  • 死信队列

由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

  • 重试队列

重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker中。与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列Q1,Q1的重新投递延迟为5s,在5s过后重新投递该消息;如果消息再次消费失败则入重试队列Q2,Q2的重新投递延迟为10s,在10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。重试队列与延迟队列有相同的地方,都是需要设置延迟级别,它们彼此的区别是:延迟队列动作由内部触发,重试队列动作由外部消费端触发;延迟队列作用一次,而重试队列的作用范围会向后传递。

  • 消费模式

消费模式分为推(push)模式和拉(pull)模式。推模式是指由Broker主动推送消息至消费端,实时性较好,不过需要一定的流制机制来确保服务端推送过来的消息不会压垮消费端。而拉模式是指消费端主动向Broker端请求拉取(一般是定时或者定量)消息,实时性较推模式差,但是可以根据自身的处理能力而控制拉取的消息量。

  • 广播消费

消息一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。对于点对点的模式而言,消息被消费以后,队列中不会再存储,所以消息消费者不可能消费到已经被消费的消息。虽然队列可以支持多个消费者,但是一条消息只会被一个消费者消费。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。RabbitMQ是一种典型的点对点模式,而Kafka是一种典型的发布订阅模式。但是RabbitMQ中可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果,Kafka中也能以点对点的形式消费,你完全可以把其消费组(consumer group)的概念看成是队列的概念。不过对比来说,Kafka中因为有了消息回溯功能的存在,对于广播消费的力度支持比RabbitMQ的要强。

  • 消息回溯

一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。

  • 消息堆积+持久化

流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。RabbitMQ是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。Kafka是一种典型的磁盘式堆积,所有的消息都存储在磁盘中。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。援引纽约时报的案例,其直接将Kafka用作存储系统。

  • 消息追踪

对于分布式架构系统中的链路追踪(trace)而言,大家一定不会陌生。对于消息中间件而言,消息的链路追踪(以下简称消息追踪)同样重要。对于消息追踪最通俗的理解就是要知道消息从哪来,存在哪里以及发往哪里去。基于此功能下,我们可以对发送或者消费完的消息进行链路追踪服务,进而可以进行问题的快速定位与排查。

  • 消息过滤

消息过滤是指按照既定的过滤规则为下游用户提供指定类别的消息。就以kafka而言,完全可以将不同类别的消息发送至不同的topic中,由此可以实现某种意义的消息过滤,或者Kafka还可以根据分区对同一个topic中的消息进行分类。不过更加严格意义上的消息过滤应该是对既定的消息采取一定的方式按照一定的过滤规则进行过滤。同样以Kafka为例,可以通过客户端提供的ConsumerInterceptor接口或者Kafka Stream的filter功能进行消息过滤。

  • 多租户

也可以称为多重租赁技术,是一种软件架构技术,主要用来实现多用户的环境下公用相同的系统或程序组件,并且仍可以确保各用户间数据的隔离性。RabbitMQ就能够支持多租户技术,每一个租户表示为一个vhost,其本质上是一个独立的小型RabbitMQ服务器,又有自己独立的队列、交换器及绑定关系等,并且它拥有自己独立的权限。vhost就像是物理机中的虚拟机一样,它们在各个实例间提供逻辑上的分离,为不同程序安全保密地允许数据,它既能将同一个RabbitMQ中的众多客户区分开,又可以避免队列和交换器等命名冲突。

  • 多协议支持

消息是信息的载体,为了让生产者和消费者都能理解所承载的信息(生产者需要知道如何构造消息,消费者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。有效的消息一定具有某种格式,而没有格式的消息是没有意义的。一般消息层面的协议有AMQP、MQTT、STOMP、XMPP等(消息领域中的JMS更多的是一个规范而不是一个协议),支持的协议越多其应用范围就会越广,通用性越强,比如RabbitMQ能够支持MQTT协议就让其在物联网应用中获得一席之地。还有的消息中间件是基于其本身的私有协议运转的,典型的如Kafka。

  • 跨语言支持

对很多公司而言,其技术栈体系中会有多种编程语言,如C/C++、JAVA、Go、PHP等,消息中间件本身具备应用解耦的特性,如果能够进一步的支持多客户端语言,那么就可以将此特性的效能扩大。跨语言的支持力度也可以从侧面反映出一个消息中间件的流行程度。

  • 流量控制

流量控制(flow control)针对的是发送方和接收方速度不匹配的问题,提供一种速度匹配服务抑制发送速率使接收方应用程序的读取速率与之相适应。通常的流控方法有Stop-and-wait、滑动窗口以及令牌桶等。

  • 消息顺序性

顾名思义,消息顺序性是指保证消息有序。这个功能有个很常见的应用场景就是CDC(Change Data Chapture),以MySQL为例,如果其传输的binlog的顺序出错,比如原本是先对一条数据加1,然后再乘以2,发送错序之后就变成了先乘以2后加1了,造成了数据不一致。

  • 安全机制

在Kafka 0.9版本之后就开始增加了身份认证和权限控制两种安全机制。身份认证是指客户端与服务端连接进行身份认证,包括客户端与Broker之间、Broker与Broker之间、Broker与ZooKeeper之间的连接认证,目前支持SSL、SASL等认证机制。权限控制是指对客户端的读写操作进行权限控制,包括对消息或Kafka集群操作权限控制。权限控制是可插拔的,并支持与外部的授权服务进行集成。对于RabbitMQ而言,其同样提供身份认证(TLS/SSL、SASL)和权限控制(读写操作)的安全机制。

  • 消息幂等性

对于确保消息在生产者和消费者之间进行传输而言一般有三种传输保障(delivery guarantee):At most once,至多一次,消息可能丢失,但绝不会重复传输;At least once,至少一次,消息绝不会丢,但是可能会重复;Exactly once,精确一次,每条消息肯定会被传输一次且仅一次。对于大多数消息中间件而言,一般只提供At most once和At least once两种传输保障,对于第三种一般很难做到,由此消息幂等性也很难保证。

Kafka自0.11版本开始引入了幂等性和事务,Kafka的幂等性是指单个生产者对于单分区单会话的幂等,而事务可以保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚,这两个功能加起来可以让Kafka具备EOS(Exactly Once Semantic)的能力。

不过如果要考虑全局的幂等,还需要与从上下游方面综合考虑,即关联业务层面,幂等处理本身也是业务层面所需要考虑的重要议题。以下游消费者层面为例,有可能消费者消费完一条消息之后没有来得及确认消息就发生异常,等到恢复之后又得重新消费原来消费过的那条消息,那么这种类型的消息幂等是无法有消息中间件层面来保证的。如果要保证全局的幂等,需要引入更多的外部资源来保证,比如以订单号作为唯一性标识,并且在下游设置一个去重表。

  • 事务性消息

事务本身是一个并不陌生的词汇,事务是由事务开始(Begin Transaction)和事务结束(End Transaction)之间执行的全体操作组成。支持事务的消息中间件并不在少数,Kafka和RabbitMQ都支持,不过此两者的事务是指生产者发生消息的事务,要么发送成功,要么发送失败。消息中间件可以作为用来实现分布式事务的一种手段,但其本身并不提供全局分布式事务的功能。

下表是对Kafka与RabbitMQ功能的总结性对比及补充说明。

功能项Kafka(1.1.0版本)RabbitMQ(3.6.10版本)
优先级队列不支持支持。建议优先级大小设置在0-10之间。
延迟队列不支持支持
死信队列不支持支持
重试队列不支持不支持。RabbitMQ中可以参考延迟队列实现一个重试队列,二次封装比较简单。如果要在Kafka中实现重试队列,首先得实现延迟队列的功能,相对比较复杂。
消费模式拉模式推模式+拉模式
广播消费支持。Kafka对于广播消费的支持相对而言更加正统。支持,但力度较Kafka弱。
消息回溯支持。Kafka支持按照offset和timestamp两种维度进行消息回溯。不支持。RabbitMQ中消息一旦被确认消费就会被标记删除。
消息堆积支持支持。一般情况下,内存堆积达到特定阈值时会影响其性能,但这不是绝对的。如果考虑到吞吐这因素,Kafka的堆积效率比RabbitMQ总体上要高很多。
持久化支持支持
消息追踪不支持。消息追踪可以通过外部系统来支持,但是支持粒度没有内置的细腻。支持。RabbitMQ中可以采用Firehose或者rabbitmq_tracing插件实现。不过开启rabbitmq_tracing插件件会大幅影响性能,不建议生产环境开启,反倒是可以使用Firehose与外部链路系统结合提供高细腻度的消息追踪支持。
消息过滤客户端级别的支持不支持。但是二次封装一下也非常简单。
多租户不支持支持
多协议支持只支持定义协议,目前几个主流版本间存在兼容性问题。RabbitMQ本身就是AMQP协议的实现,同时支持MQTT、STOMP等协议。
跨语言支持采用Scala和Java编写,支持多种语言的客户端。采用Erlang编写,支持多种语言的客户端。
流量控制支持client和user级别,通过主动设置可将流控作用于生产者或消费者。RabbitMQ的流控基于Credit-Based算法,是内部被动触发的保护机制,作用于生产者层面。
消息顺序性支持单分区(partition)级别的顺序性。顺序性的条件比较苛刻,需要单线程发送、单线程消费并且不采用延迟队列、优先级队列等一些高级功能,从某种意义上来说不算支持顺序性。
安全机制(TLS/SSL、SASL)身份认证和(读写)权限控制与Kafka相似
幂等性支持单个生产者单分区单会话的幂等性。不支持
事务性消息支持支持

2. 性能

功能维度是消息中间件选型中的一个重要的参考维度,但这并不是唯一的维度。有时候性能比功能还要重要,况且性能和功能很多时候是相悖的,鱼和熊掌不可兼得,Kafka在开启幂等、事务功能的时候会使其性能降低,RabbitMQ在开启rabbitmq_tracing插件的时候也会极大的影响其性能。消息中间件的性能一般是指其吞吐量,虽然从功能维度上来说,RabbitMQ的优势要大于Kafka,但是Kafka的吞吐量要比RabbitMQ高出1至2个数量级,一般RabbitMQ的单机QPS在万级别之内,而Kafka的单机QPS可以维持在十万级别,甚至可以达到百万级。

消息中间件的吞吐量始终会受到硬件层面的限制。就以网卡带宽为例,如果单机单网卡的带宽为1Gbps,如果要达到百万级的吞吐,那么消息体大小不得超过(1Gb/8)/100W,即约等于134B,换句话说如果消息体大小超过134B,那么就不可能达到百万级别的吞吐。这种计算方式同样可以适用于内存和磁盘。

时延作为性能维度的一个重要指标,却往往在消息中间件领域所被忽视,因为一般使用消息中间件的场景对时效性的要求并不是很高,如果要求时效性完全可以采用RPC的方式实现。消息中间件具备消息堆积的能力,消息堆积越大也就意味着端到端的时延也就越长,与此同时延时队列也是某些消息中间件的一大特色。那么为什么还要关注消息中间件的时延问题呢?消息中间件能够解耦系统,对于一个时延较低的消息中间件而言,它可以让上游生产者发送消息之后可以迅速的返回,也可以让消费者更加快速的获取到消息,在没有堆积的情况下可以让整体上下游的应用之间的级联动作更加高效,虽然不建议在时效性很高的场景下使用消息中间件,但是如果所使用的消息中间件的时延方面比较优秀,那么对于整体系统的性能将会是一个不小的提升。

3. 可靠性+可用性

消息丢失是使用消息中间件时所不得不面对的一个同点,其背后消息可靠性也是衡量消息中间件好坏的一个关键因素。尤其是在金融支付领域,消息可靠性尤为重要。然而说到可靠性必然要说到可用性,注意这两者之间的区别,消息中间件的可靠性是指对消息不丢失的保障程度;而消息中间件的可用性是指无故障运行的时间百分比,通常用几个9来衡量。

从狭义的角度来说,分布式系统架构是一致性协议理论的应用实现,对于消息可靠性和可用性而言也可以追溯到消息中间件背后的一致性协议。对于Kafka而言,其采用的是类似PacificA的一致性协议,通过ISR(In-Sync-Replica)来保证多副本之间的同步,并且支持强一致性语义(通过acks实现)。对应的RabbitMQ是通过镜像环形队列实现多副本及强一致性语义的。多副本可以保证在master节点宕机异常之后可以提升slave作为新的master而继续提供服务来保障可用性。Kafka设计之初是为日志处理而生,给人们留下了数据可靠性要求不要的不良印象,但是随着版本的升级优化,其可靠性得到极大的增强,详细可以参考KIP101。就目前而言,在金融支付领域使用RabbitMQ居多,而在日志处理、大数据等方面Kafka使用居多,随着RabbitMQ性能的不断提升和Kafka可靠性的进一步增强,相信彼此都能在以前不擅长的领域分得一杯羹。

同步刷盘是增强一个组件可靠性的有效方式,消息中间件也不例外,Kafka和RabbitMQ都可以支持同步刷盘,但是笔者对同步刷盘有一定的疑问:绝大多数情景下,一个组件的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而是采用多副本的机制来保证。

这里还要提及的一个方面是扩展能力,这里我狭隘地将此归纳到可用性这一维度,消息中间件的扩展能力能够增强其用可用能力及范围,比如前面提到的RabbitMQ支持多种消息协议,这个就是基于其插件化的扩展实现。还有从集群部署上来讲,归功于Kafka的水平扩展能力,其基本上可以达到线性容量提升的水平,在LinkedIn实践介绍中就提及了有部署超过千台设备的Kafka集群。

5. 运维管理

在消息中间件的使用过程中难免会出现各式各样的异常情况,有客户端的,也有服务端的,那么怎样及时有效的进行监测及修复。业务线流量有峰值又低谷,尤其是电商领域,那么怎样前进行有效的容量评估,尤其是大促期间?脚踢电源、网线被挖等事件层出不穷,如何有效的做好异地多活?这些都离不开消息中间件的衍生产品——运维管理。

运维管理也可以进行进一步的细分,比如:申请、审核、监控、告警、管理、容灾、部署等。

申请、审核很好理解,在源头对资源进行管控,既可以进行有效校正应用方的使用规范,配和监控也可以做好流量统计与流量评估工作,一般申请、审核与公司内部系统交融性较大,不适合使用开源类的产品。

监控、告警也比较好理解,对消息中间件的使用进行全方位的监控,即可以为系统提供基准数据,也可以在检测到异常的情况配合告警,以便运维、开发人员的迅速介入。除了一般的监控项(比如硬件、GC等)之外,对于消息中间件还需要关注端到端时延、消息审计、消息堆积等方面。对于RabbitMQ而言,最正统的监控管理工具莫过于rabbitmq_management插件了,但是社区内还有AppDynamics, Collectd, DataDog, Ganglia, Munin, Nagios, New Relic, Prometheus, Zenoss等多种优秀的产品。Kafka在此方面也毫不逊色,比如:Kafka Manager, Kafka Monitor, Kafka Offset Monitor, Burrow, Chaperone, Confluent Control Center等产品,尤其是Cruise还可以提供自动化运维的功能。

不管是扩容、降级、版本升级、集群节点部署、还是故障处理都离不开管理工具的应用,一个配套完备的管理工具集可以在遇到变更时做到事半功倍。故障可大可小,一般是一些应用异常,也可以是机器掉电、网络异常、磁盘损坏等单机故障,这些故障单机房内的多副本足以应付。如果是机房故障就要涉及异地容灾了,关键点在于如何有效的进行数据复制,对于Kafka而言,可以参考MirrorMarker、uReplicator等产品,而RabbitMQ可以参考Federation和Shovel。

6. 社区力度及生态发展

对于目前流行的编程语言而言,如Java、Python,如果你在使用过程中遇到了一些异常,基本上可以通过搜索引擎的帮助来得到解决,因为一个产品用的人越多,踩过的坑也就越多,对应的解决方案也就越多。对于消息中间件也同样适用,如果你选择了一种“生僻”的消息中间件,可能在某些方面运用的得心应手,但是版本更新缓慢、遇到棘手问题也难以得到社区的支持而越陷越深;相反如果你选择了一种“流行”的消息中间件,其更新力度大,不仅可以迅速的弥补之前的不足,而且也能顺应技术的快速发展来变更一些新的功能,这样可以让你以“站在巨人的肩膀上”。在运维管理维度我们提及了Kafka和RabbitMQ都有一系列开源的监控管理产品,这些正是得益于其社区及生态的迅猛发展。


四、消息中间件选型误区探讨

在进行消息中间件选型之前可以先问自己一个问题:是否真的需要一个消息中间件?在搞清楚这个问题之后,还可以继续问自己一个问题:是否需要自己维护一套消息中间件?很多初创型公司为了节省成本会选择直接购买消息中间件有关的云服务,自己只需要关注收发消息即可,其余的都可以外包出去。

很多人面对消息中间件时会有一种自研的冲动,你完全可以对Java中的ArrayBlockingQueue做一个简单的封装,你也可以基于文件、数据库、Redis等底层存储封装而形成一个消息中间件。消息中间件做为一个基础组件并没有想象中的那么简单,其背后还需要配套的管理运维整个生态的产品集。自研还有会交接问题,如果文档不齐全、运作不规范将会带给新人噩梦般的体验。是否真的有自研的必要?如果不是KPI的压迫可以先考虑下这2个问题:1. 目前市面上的消息中间件是否都真的无法满足目前业务需求? 2. 团队是否有足够的能力、人力、财力、精力来支持自研?

很多人在做消息中间件选型时会参考网络上的很多对比类的文章,但是其专业性、严谨性、以及其政治立场问题都有待考证,需要带着怀疑的态度去审视这些文章。比如有些文章会在没有任何限定条件及场景的情况下直接定义某款消息中间件最好,还有些文章没有指明消息中间件版本及测试环境就来做功能和性能对比分析,诸如此类的文章都可以唾弃之。

消息中间件犹如小马过河,选择合适的才最重要,这需要贴合自身的业务需求,技术服务于业务,大体上可以根据上一节所提及的功能、性能等6个维度来一一进行筛选。更深层次的抉择在于你能否掌握其魂,笔者鄙见:RabbitMQ在于routing,而Kafka在于streaming,了解其根本对于自己能够对症下药选择到合适的消息中间件尤为重要。

消息中间件选型切忌一味的追求性能或者功能,性能可以优化,功能可以二次开发。如果要在功能和性能方面做一个抉择的话,那么首选性能,因为总体上来说性能优化的空间没有功能扩展的空间大。然而对于长期发展而言,生态又比性能以及功能都要重要。

很多时候,对于可靠性方面也容易存在一个误区:想要找到一个产品来保证消息的绝对可靠,很不幸的是这世界上没有绝对的东西,只能说尽量趋于完美。想要尽可能的保障消息的可靠性也并非单单只靠消息中间件本身,还要依赖于上下游,需要从生产端、服务端和消费端这3个维度去努力保证,《RabbitMQ消息可靠性分析》这篇文章就从这3个维度去分析了RabbitMQ的可靠性。

消息中间件选型还有一个考量标准就是尽量贴合团队自身的技术栈体系,虽然说没有蹩脚的消息中间件只有蹩脚的程序员,但是让一个C栈的团队去深挖PhxQueue总比去深挖Scala编写的Kafka要容易的多。

消息中间件大道至简:一发一存一消费,没有最好的消息中间件,只有最合适的消息中间件。

来源: https://www.jianshu.com/p/8f7ebbcbeee5

 

scribe、chukwa、kafka、flume日志系统对比

1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;(3) 具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。

本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开源的日志系统,包括facebook的scribe,apache的chukwa,linkedin的kafka和cloudera的flume等。

2. FaceBook的Scribe

Scribe是facebook开源的日志收集系统,在facebook内部已经得到大量的应用。它能够从各种日志源上收集日志,存储到一个中央存储系统 (可以是NFS,分布式文件系统等)上,以便于进行集中统计分析处理。它为日志的“分布式收集,统一处理”提供了一个可扩展的,高容错的方案。

它最重要的特点是容错性好。当后端的存储系统crash时,scribe会将数据写到本地磁盘上,当存储系统恢复正常后,scribe将日志重新加载到存储系统中。

chukwa

架构

scribe的架构比较简单,主要包括三部分,分别为scribe agent, scribe和存储系统。

(1) scribe agent

scribe agent实际上是一个thrift client。 向scribe发送数据的唯一方法是使用thrift client, scribe内部定义了一个thrift接口,用户使用该接口将数据发送给server。

(2) scribe

scribe接收到thrift client发送过来的数据,根据配置文件,将不同topic的数据发送给不同的对象。scribe提供了各种各样的store,如 file, HDFS等,scribe可将数据加载到这些store中。

(3) 存储系统

存储系统实际上就是scribe中的store,当前scribe支持非常多的store,包括file(文件),buffer(双层存储,一个主储存, 一个副存储),network(另一个scribe服务器),bucket(包含多个 store,通过hash的将数据存到不同store中),null(忽略数据),thriftfile(写到一个Thrift TFileTransport文件中)和multi(把数据同时存放到不同store中)。

3. Apache的Chukwa

chukwa是一个非常新的开源项目,由于其属于hadoop系列产品,因而使用了很多hadoop的组件(用HDFS存储,用mapreduce处理数据),它提供了很多模块以支持hadoop集群日志分析。

需求:

(1) 灵活的,动态可控的数据源

(2) 高性能,高可扩展的存储系统

(3) 合适的框架,用于对收集到的大规模数据进行分析

chukwa

架构

Chukwa中主要有3种角色,分别为:adaptor,agent,collector。

(1) Adaptor 数据源

可封装其他数据源,如file,unix命令行工具等

目前可用的数据源有:hadoop logs,应用程序度量数据,系统参数数据(如linux cpu使用流率)。

(2) HDFS 存储系统

Chukwa采用了HDFS作为存储系统。HDFS的设计初衷是支持大文件存储和小并发高速写的应用场景,而日志系统的特点恰好相反,它需支持高并发低速 率的写和大量小文件的存储。需要注意的是,直接写到HDFS上的小文件是不可见的,直到关闭文件,另外,HDFS不支持文件重新打开。

(3) Collector和Agent

为了克服(2)中的问题,增加了agent和collector阶段。

Agent的作用:给adaptor提供各种服务,包括:启动和关闭adaptor,将数据通过HTTP传递给Collector;定期记录adaptor状态,以便crash后恢复。

Collector的作用:对多个数据源发过来的数据进行合并,然后加载到HDFS中;隐藏HDFS实现的细节,如,HDFS版本更换后,只需修改collector即可。

(4) Demux和achieving

直接支持利用MapReduce处理数据。它内置了两个mapreduce作业,分别用于获取data和将data转化为结构化的log。存储到data store(可以是数据库或者HDFS等)中。

4. LinkedIn的Kafka

Kafka是2010年12月份开源的项目,采用scala语言编写,使用了多种效率优化机制,整体架构比较新颖(push/pull),更适合异构集群。

设计目标:

(1) 数据在磁盘上的存取代价为O(1)

(2) 高吞吐率,在普通的服务器上每秒也能处理几十万条消息

(3) 分布式架构,能够对消息分区

(4) 支持将数据并行的加载到hadoop

chukwa

架构

Kafka实际上是一个消息发布订阅系统。producer向某个topic发布消息,而consumer订阅某个topic的消息,进而一旦有新的关于 某个topic的消息,broker会传递给订阅它的所有consumer。 在kafka中,消息是按topic组织的,而每个topic又会分为多个partition,这样便于管理数据和进行负载均衡。同时,它也使用了 zookeeper进行负载均衡。

Kafka中主要有三种角色,分别为producer,broker和consumer。

(1) Producer

Producer的任务是向broker发送数据。Kafka提供了两种producer接口,一种是low_level接口,使用该接口会向特定的 broker的某个topic下的某个partition发送数据;另一种那个是high level接口,该接口支持同步/异步发送数据,基于zookeeper的broker自动识别和负载均衡(基于Partitioner)。

其中,基于zookeeper的broker自动识别值得一说。producer可以通过zookeeper获取可用的broker列表,也可以在zookeeper中注册listener,该listener在以下情况下会被唤醒:

a.添加一个broker

b.删除一个broker

c.注册新的topic

d.broker注册已存在的topic

当producer得知以上时间时,可根据需要采取一定的行动。

(2) Broker

Broker采取了多种策略提高数据处理效率,包括sendfile和zero copy等技术。

(3) Consumer

consumer的作用是将日志信息加载到中央存储系统上。kafka提供了两种consumer接口,一种是low level的,它维护到某一个broker的连接,并且这个连接是无状态的,即,每次从broker上pull数据时,都要告诉broker数据的偏移 量。另一种是high-level 接口,它隐藏了broker的细节,允许consumer从broker上push数据而不必关心网络拓扑结构。更重要的是,对于大部分日志系统而 言,consumer已经获取的数据信息都由broker保存,而在kafka中,由consumer自己维护所取数据信息。

5. Cloudera的Flume

Flume是cloudera于2009年7月开源的日志系统。它内置的各种组件非常齐全,用户几乎不必进行任何额外开发即可使用。

设计目标:

(1) 可靠性

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据 agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。

(2) 可扩展性

Flume采用了三层架构,分别问agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由 master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。

(3) 可管理性

所有agent和colletor由master统一管理,这使得系统便于维护。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各 个数据源配置和动态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。

(4) 功能可扩展性

用户可以根据需要添加自己的agent,colletor或者storage。此外,Flume自带了很多组件,包括各种agent(file, syslog等),collector和storage(file,HDFS等)。

chukwa

架构

正如前面提到的,Flume采用了分层架构,由三层组成,分别为agent,collector和storage。其中,agent和collector均由两部分组成:source和sink,source是数据来源,sink是数据去向。

(1) agent

agent的作用是将数据源的数据发送给collector,Flume自带了很多直接可用的数据源(source),如:

text(“filename”):将文件filename作为数据源,按行发送

tail(“filename”):探测filename新产生的数据,按行发送出去

fsyslogTcp(5140):监听TCP的5140端口,并且接收到的数据发送出去

同时提供了很多sink,如:

console[("format")] :直接将将数据显示在桌面上

text(“txtfile”):将数据写到文件txtfile中

dfs(“dfsfile”):将数据写到HDFS上的dfsfile文件中

syslogTcp(“host”,port):将数据通过TCP传递给host节点

(2) collector

collector的作用是将多个agent的数据汇总后,加载到storage中。它的source和sink与agent类似。

下面例子中,agent监听TCP的5140端口接收到的数据,并发送给collector,由collector将数据加载到HDFS上。

chukwa

一个更复杂的例子如下:有6个agent,3个collector,所有collector均将数据导入HDFS中。agent A,B将数据发送给collector A,agent C,D将数据发送给collectorB,agent C,D将数据发送给collectorB。同时,为每个agent添加end-to-end可靠性保障(Flume的三种可靠性保障分别由 agentE2EChain, agentDFOChain, and agentBEChain实现),如,当collector A出现故障时,agent A和agent B会将数据分别发给collector B和collector C。

chukwa

下面是简写的配置文件片段:

此外,使用autoE2EChain,当某个collector 出现故障时,Flume会自动探测一个可用collector,并将数据定向到这个新的可用collector上。

(3) storage

storage是存储系统,可以是一个普通file,也可以是HDFS,HIVE,HBase等。

6. 总结

根据这四个系统的架构设计,可以总结出典型的日志系统需具备三个基本组件,分别为agent(封装数据源,将数据源中的数据发送给 collector),collector(接收多个agent的数据,并进行汇总后导入后端的store中),store(中央存储系统,应该具有可扩 展性和可靠性,应该支持当前非常流行的HDFS)。

下面表格对比了这四个系统:

chukwa

7. 参考资料

scribe主页:https://github.com/facebook/scribe

chukwa主页:http://incubator.apache.org/chukwa/

kafka主页:http://sna-projects.com/kafka/

Flume主页:https://github.com/cloudera/flume/

本文来自:http://my.oschina.net/sunzy/blog/183795

ELK+Kafka集群日志分析系统

一、 系统介绍 2

二、 版本说明 3

三、 服务部署 3

1) JDK部署 3

2) Elasticsearch集群部署及优化 3

3) Elasticsearch健康插件安装 13

4) Shield之elasticsearch安全插件 15

5)Zookeeper集群搭建 15

6)Kafka集群搭建 17

7)测试Kafka和Zookeeper集群连通性 19

8) Logstash部署 20

9) Kibana部署 20

四、 系统使用示例 22

1) Logstash 作为kafka生产者示例 22

2) Logstash index 消费kafka示例 23

一、系统介绍
随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析。下面来介绍一下架构

 

这是一个再常见不过的架构了:

(1)Kafka:接收用户日志的消息队列

(2)Logstash:做日志解析,统一成json输出给Elasticsearch

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

(5)Zookeeper: 状态管理,监控进程等服务

二、版本说明
本次系统为:Centos6.5 64位

Java版本为:1.8.0_74

Elasticsearch为:2.4.0

Logstash :2.4.0

Kibana:4.6.1

Shield:2.0+

Kafka:2.10-0.10.0.1

Zookeeper:3.4.9

相应的版本最好下载对应的插件。

三、服务部署
1) JDK部署
下载JDK包到/data目录下解压,并将变量导入/etc/profile末尾。

export JAVA_HOME=/usr/local/jdk

export PATH=$JAVA_HOME/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

2) Elasticsearch集群部署及优化
下载并安装es rpm包:

Rpm -ivh https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.4.0/elasticsearch-2.4.0.rpm

启动方式:

/etc/init.d/elasticsearch start|stop

Elasticsearch bin文件内存优化:

由于是rpm 方式安装将/usr/share/Elasticsearch/bin/Elasticsearch 加入如下参数:

ES_HEAP_SIZE=16g     #ES_HEAP_SIZE表示JVM参数的-Xms and -Xmx设置

MAX_OPEN_FILES=65535

Elasticsearch 配置文件优化以及说明:

我们需要配置如下:

cluster.name: es-ajb-cluster

node.name: es-node-1

node.master: true

node.data: true

index.number_of_shards: 8

index.number_of_replicas: 1

path.data: /data/es

path.logs: /data/eslogs

bootstrap.mlockall: true

network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["172.16.38.133", "172.16.38.134","172.16.38.135"]

以下为更详细yml配置文件参数解释:

##################### Elasticsearch Configuration Example ################

# 我只是挑些重要的配置选项进行注释,其实自带的已经有非常细致的英文注释了.有理解偏差的地方请以英文原版解释为准.

################################### Cluster#############################

# 代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的.

# es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。

# cluster.name可以确定你的集群名称,当你的elasticsearch集群在同一个网段中elasticsearch会自动的找到具有相同cluster.name的elasticsearch服务.

# 所以当同一个网段具有多个elasticsearch集群时cluster.name就成为同一个集群的标识.

#cluster.name: elasticsearch

#################################### Node ##############################

# 节点名称同理,可自动生成也可手动配置.

#node.name: "Franz Kafka"

# 允许一个节点是否可以成为一个master节点,es是默认集群中的第一台机器为master,如果这台机器停止就会重新选举master.

#node.master: true

# 允许该节点存储数据(默认开启)

#node.data: true

# 配置文件中给出了三种配置高性能集群拓扑结构的模式,如下:

# 1. 如果你想让节点从不选举为主节点,只用来存储数据,可作为负载器

# node.master: false

# node.data: true

#

# 2. 如果想让节点成为主节点,且不存储任何数据,并保有空闲资源,可作为协调器

# node.master: true

# node.data: false

#

# 3. 如果想让节点既不称为主节点,又不成为数据节点,那么可将他作为搜索器,从节点中获取数据,生成搜索结果等

# node.master: false

# node.data: false

# 监控集群状态有一下插件和API可以使用:

# Use the Cluster Health API [http://localhost:9200/_cluster/health], the

# Node Info API [http://localhost:9200/_nodes] or GUI tools

# such as <http://www.elasticsearch.org/overview/marvel/>,

# <http://github.com/karmi/elasticsearch-paramedic>,

# <http://github.com/lukas-vlcek/bigdesk> and

# <http://mobz.github.com/elasticsearch-head> to inspect the cluster state.

# A node can have generic attributes associated with it, which can later be used

# for customized shard allocation filtering, or allocation awareness. An attribute

# is a simple key value pair, similar to node.key: value, here is an example:

#

#node.rack: rack314

# By default, multiple nodes are allowed to start from the same installation location

# to disable it, set the following:

#node.max_local_storage_nodes: 1

#################################### Index #############################

# 设置索引的分片数,默认为5

#index.number_of_shards: 5

# 设置索引的副本数,默认为1:

#index.number_of_replicas: 1

# 配置文件中提到的最佳实践是,如果服务器够多,可以将分片提高,尽量将数据平均分布到大集群中去

# 同时,如果增加副本数量可以有效的提高搜索性能

# 需要注意的是,"number_of_shards" 是索引创建后一次生成的,后续不可更改设置

# "number_of_replicas" 是可以通过API去实时修改设置的

#################################### Paths ####################################

# 配置文件存储位置

#path.conf: /path/to/conf

# 数据存储位置(单个目录设置)

#path.data: /path/to/data

# 多个数据存储位置,有利于性能提升

#path.data: /path/to/data1,/path/to/data2

# 临时文件的路径

#path.work: /path/to/work

# 日志文件的路径

#path.logs: /path/to/logs

# 插件安装路径

#path.plugins: /path/to/plugins

#################################### Plugin ############################

# 设置插件作为启动条件,如果一下插件没有安装,则该节点服务不会启动

#plugin.mandatory: mapper-attachments,lang-groovy

################################### Memory ##############################

# 当JVM开始写入交换空间时(swapping)ElasticSearch性能会低下,你应该保证它不会写入交换空间

# 设置这个属性为true来锁定内存,同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过 `ulimit -l unlimited` 命令

#bootstrap.mlockall: true

# 确保 ES_MIN_MEM 和 ES_MAX_MEM 环境变量设置为相同的值,以及机器有足够的内存分配给Elasticsearch

# 注意:内存也不是越大越好,一般64位机器,最大分配内存别才超过32G

############################## Network And HTTP #######################

# 设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0

#network.bind_host: 192.168.0.1

# 设置其它节点和该节点交互的ip地址,如果不设置它会自动设置,值必须是个真实的ip地址

#network.publish_host: 192.168.0.1

# 同时设置bind_host和publish_host上面两个参数

#network.host: 192.168.0.1

# 设置节点间交互的tcp端口,默认是9300

#transport.tcp.port: 9300

# 设置是否压缩tcp传输时的数据,默认为false,不压缩

#transport.tcp.compress: true

# 设置对外服务的http端口,默认为9200

#http.port: 9200

# 设置请求内容的最大容量,默认100mb

#http.max_content_length: 100mb

# 使用http协议对外提供服务,默认为true,开启

#http.enabled: false

################################### Gateway #############################

# gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统

#gateway.type: local

# 下面的配置控制怎样以及何时启动一整个集群重启的初始化恢复过程

# (当使用shard gateway时,是为了尽可能的重用local data(本地数据))

# 一个集群中的N个节点启动后,才允许进行恢复处理

#gateway.recover_after_nodes: 1

# 设置初始化恢复过程的超时时间,超时时间从上一个配置中配置的N个节点启动后算起

#gateway.recover_after_time: 5m

# 设置这个集群中期望有多少个节点.一旦这N个节点启动(并且recover_after_nodes也符合),

# 立即开始恢复过程(不等待recover_after_time超时)

#gateway.expected_nodes: 2

############################# Recovery Throttling #######################

# 下面这些配置允许在初始化恢复,副本分配,再平衡,或者添加和删除节点时控制节点间的分片分配

# 设置一个节点的并行恢复数

# 1.初始化数据恢复时,并发恢复线程的个数,默认为4

#cluster.routing.allocation.node_initial_primaries_recoveries: 4

#

# 2.添加删除节点或负载均衡时并发恢复线程的个数,默认为2

#cluster.routing.allocation.node_concurrent_recoveries: 2

# 设置恢复时的吞吐量(例如:100mb,默认为0无限制.如果机器还有其他业务在跑的话还是限制一下的好)

#indices.recovery.max_bytes_per_sec: 20mb

# 设置来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5

#indices.recovery.concurrent_streams: 5

# 注意: 合理的设置以上参数能有效的提高集群节点的数据恢复以及初始化速度

################################## Discovery ##########################

# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点.默认为1,对于大的集群来说,可以设置大一点的值(2-4)

#discovery.zen.minimum_master_nodes: 1

# 探查的超时时间,默认3秒,提高一点以应对网络不好的时候,防止脑裂

#discovery.zen.ping.timeout: 3s

# For more information, see

# <http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-discovery-zen.html>

# 设置是否打开多播发现节点.默认是true.

# 当多播不可用或者集群跨网段的时候集群通信还是用单播吧

#discovery.zen.ping.multicast.enabled: false

# 这是一个集群中的主节点的初始列表,当节点(主节点或者数据节点)启动时使用这个列表进行探测

#discovery.zen.ping.unicast.hosts: ["host1", "host2:port"]

# Slow Log部分与GC log部分略,不过可以通过相关日志优化搜索查询速度

############## Memory(重点需要调优的部分) ################

# Cache部分:

# es有很多种方式来缓存其内部与索引有关的数据.其中包括filter cache

# filter cache部分:

# filter cache是用来缓存filters的结果的.默认的cache type是node type.node type的机制是所有的索引内部的分片共享filter cache.node type采用的方式是LRU方式.即:当缓存达到了某个临界值之后,es会将最近没有使用的数据清除出filter cache.使让新的数据进入es.

# 这个临界值的设置方法如下:indices.cache.filter.size 值类型:eg.:512mb 20%。默认的值是10%。

# out of memory错误避免过于频繁的查询时集群假死

# 1.设置es的缓存类型为Soft Reference,它的主要特点是据有较强的引用功能.只有当内存不够的时候,才进行回收这类内存,因此在内存足够的时候,它们通常不被回收.另外,这些引用对象还能保证在Java抛出OutOfMemory异常之前,被设置为null.它可以用于实现一些常用图片的缓存,实现Cache的功能,保证最大限度的使用内存而不引起OutOfMemory.在es的配置文件加上index.cache.field.type: soft即可.

# 2.设置es最大缓存数据条数和缓存失效时间,通过设置index.cache.field.max_size: 50000来把缓存field的最大值设置为50000,设置index.cache.field.expire: 10m把过期时间设置成10分钟.

#index.cache.field.max_size: 50000

#index.cache.field.expire: 10m

#index.cache.field.type: soft

# field data部分&&circuit breaker部分:

# 用于field data 缓存的内存数量,主要用于当使用排序,faceting操作时,elasticsearch会将一些热点数据加载到内存中来提供给客户端访问,但是这种缓存是比较珍贵的,所以对它进行合理的设置.

# 可以使用值:eg:50mb 或者 30%(节点 node heap内存量),默认是:unbounded

#indices.fielddata.cache.size: unbounded

# field的超时时间.默认是-1,可以设置的值类型: 5m

#indices.fielddata.cache.expire: -1

# circuit breaker部分:

# 断路器是elasticsearch为了防止内存溢出的一种操作,每一种circuit breaker都可以指定一个内存界限触发此操作,这种circuit breaker的设定有一个最高级别的设定:indices.breaker.total.limit 默认值是JVM heap的70%.当内存达到这个数量的时候会触发内存回收

# 另外还有两组子设置:

#indices.breaker.fielddata.limit:当系统发现fielddata的数量达到一定数量时会触发内存回收.默认值是JVM heap的70%

#indices.breaker.fielddata.overhead:在系统要加载fielddata时会进行预先估计,当系统发现要加载进内存的值超过limit * overhead时会进行进行内存回收.默认是1.03

#indices.breaker.request.limit:这种断路器是elasticsearch为了防止OOM(内存溢出),在每次请求数据时设定了一个固定的内存数量.默认值是40%

#indices.breaker.request.overhead:同上,也是elasticsearch在发送请求时设定的一个预估系数,用来防止内存溢出.默认值是1

# Translog部分:

# 每一个分片(shard)都有一个transaction log或者是与它有关的预写日志,(write log),在es进行索引(index)或者删除(delete)操作时会将没有提交的数据记录在translog之中,当进行flush 操作的时候会将tranlog中的数据发送给Lucene进行相关的操作.一次flush操作的发生基于如下的几个配置

#index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush.默认是 unlimited

#index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作.默认是512mb

#index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作.默认是30m

#index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作.es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s

#index.gateway.local.sync:多少时间进行一次的写磁盘操作,默认是5s

# 以上的translog配置都可以通过API进行动态的设置

集群部署

集群名称不修改,节点名称修改、将elasticsearch.yml 复制到其他节点,并替换其配置文件。并关闭自动发现,防止其他外来节点连入。

3) Elasticsearch健康插件安装
elasticsearch-head是一个elasticsearch的集群管理工具,它是完全由html5编写的独立网页程序,你可以通过插件把它集成到es。

插件安装方法1:

1.elasticsearch/bin/plugin -install mobz/elasticsearch-head

2.运行es

3.打开http://localhost:9200/_plugin/head/

插件安装方法2:

1.https://github.com/mobz/elasticsearch-head下载zip 解压

2.建立elasticsearch\plugins\head\_site文件

3.将解压后的elasticsearch-head-master文件夹下的文件copy到_site

4.运行es

5.打开http://localhost:9200/_plugin/head/

在地址栏输入es服务器的ip地址和端口点connect就可以连接到集群。下面是连接后的视图。这是主界面,在这里可以看到es集群的基本信息(如:节点情况,索引情况)。

通过head插件可以对索引执行api 操作。

 

4) Shield之elasticsearch安全插件
官方安装步骤以及验证

Shield 2.0+

Compatible with the latest versions of Elasticsearch and Kibana

Step 1: Install Shield into Elasticsearch

bin/plugin install license

bin/plugin install shield

Step 2: Start Elasticsearch  bin/elasticsearch

Step 3: Add an admin user  bin/shield/esusers useradd es_admin -r admin

Step 4: Test your authenticated user  curl -u es_admin -XGET 'http://localhost:9200/'

Step 5: Install Shield Into Kibana for full session support (Shield 2.2+)

Step 6: Dive into the Getting Started Guide

目前测试尚未通过,等待官方发布新版本

5)Zookeeper集群搭建
Kafka依赖Zookeeper管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安装 Zookeeper。自然,为了达到高可用的目的,Zookeeper自身也不能是单点,接下来就介绍如何搭建一个最小的Zookeeper集群(3个 zk节点)

此处选用Zookeeper的版本是3.4.9,此为Kafka0.10中推荐的Zookeeper版本。

首先解压

tar -xzvf zookeeper-3.4.9.tar.gz

进入zookeeper的conf目录,将zoo_sample.cfg复制一份,命名为zoo.cfg,此即为Zookeeper的配置文件

cp zoo_sample.cfg zoo.cfg

编辑zoo.cfg

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

dataDir=/data/zk/zk0/data

dataLogDir=/data/zk/zk0/logs

# the port at which the clients will connect

clientPort=2181

server.176 = 172.16.38.176:2888:3888

server.177 = 172.16.38.177:2888:3888

server.1771 = 172.16.38.177:2999:3999

autopurge.purgeInterval=1

dataDir和dataLogDir的路径需要在启动前创建好

clientPort为zookeeper的服务端口

server.0/1/2为zk集群中三个node的信息,定义格式为hostname:port1:port2,其中port1是node间通信使用的端口,port2是node选举使用的端口,需确保三台主机的这两个端口都是互通的

在另外两台主机上执行同样的操作,安装并配置zookeeper

分别在三台主机的dataDir路径下创建一个文件名为myid的文件,文件内容为该zk节点的编号。例如在第一台主机上建立的myid文件内容是0,第二台是1。

接下来,启动三台主机上的zookeeper服务:

bin/zkServer.sh start

3个节点都启动完成后,可依次执行如下命令查看集群状态:

bin/zkServer.sh status

命令输出如下:

Mode: leader 或 Mode: follower

3个节点中,应有1个leader和两个follower

验证zookeeper集群高可用性:

假设目前3个zk节点中,server0为leader,server1和server2为follower

我们停掉server0上的zookeeper服务:

bin/zkServer.sh stop

再到server1和server2上查看集群状态,会发现此时server1(也有可能是server2)为leader,另一个为follower。

再次启动server0的zookeeper服务,运行zkServer.sh status检查,发现新启动的server0也为follower

至此,zookeeper集群的安装和高可用性验证完成。

附:Zookeeper默认会将控制台信息输出到启动路径下的zookeeper.out中,显然在生产环境中我们不能允许Zookeeper这样做,通过如下方法,可以让Zookeeper输出按尺寸切分的日志文件:

修改conf/log4j.properties文件,将zookeeper.root.logger=INFO, CONSOLE改为

zookeeper.root.logger=INFO, ROLLINGFILE修改bin/zkEnv.sh文件,将

ZOO_LOG4J_PROP="INFO,CONSOLE"改为ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

然后重启zookeeper,就ok了

6)Kafka集群搭建
Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。

kafka 基本概念

Topic 主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费

Partition 每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。 消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费

Consumer 消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。

simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。

high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:

partition1 ---> A进程consumerid1

partition2 ---> A进程consumerid1

partition3 ---> A进程consumerid2

partition4 ---> B进程consumer1

partition5 ---> B进程consumer2

Group High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。

由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。

部署安装

此例中,我们会安装配置一个有两个Broker组成的Kafka集群,并在其上创建一个两个分区的Topic

本例中使用Kafka最新版本0.10.0

首先解压官网下载kafkatar包

编辑config/server.properties文件,下面列出关键的参数

#此Broker的ID,集群中每个Broker的ID不可相同

broker.id=0

#监听器,端口号与port一致即可

listeners=PLAINTEXT://:9092

#Broker监听的端口

port=19092

#Broker的Hostname,填主机IP即可

host.name=172.16.38.176

#向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)

advertised.host.name=172.16.38.176

advertised.port=9092

#进行IO的线程数,应大于主机磁盘数

num.io.threads=8

#消息文件存储的路径

log.dirs=/data/kafka-logs

#消息文件清理周期,即清理x小时前的消息记录

log.retention.hours=168

#每个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就行了

num.partitions=1

#Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可

zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182

将172.16.38.177 也按照上图配置文件中,只是修改brokerid即可。

7)测试Kafka和Zookeeper集群连通性
(1)建立一个主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer

#注意:factor大小不能超过broker数

(2)查看有哪些主题已经创建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181   #列出集群中所有的topic

summer  #已经创建成功

(3)查看summer这个主题的详情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer

Topic:summer PartitionCount:16 ReplicationFactor:2 Configs:

Topic: summer Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 5 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 7 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 9 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 11 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 13 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 15 Leader: 1 Replicas: 1,0 Isr: 0,1

#主题名称:summer

#Partition:16个,从0开始

#leader :id为0和1的broker

#Replicas 副本存在于broker id为0,1的上面

#Isr:活跃状态的broker

(4)发送消息,这里使用的是生产者角色

[root@kafka1 data]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.38.176:9092,172.16.38.177:9092 --topic test1

(5)接收消息,这里使用的是消费者角色

[root@kafka2 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --topic test1

(6)验证的效果,生产者和消费者

 

(7)删除一个消息主题

[root@kafka2data]#/usr/local/kafka/bin/kafka-topics.sh--zookeeper 172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --delete --topic test1

Topic test1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

这样kafka和zookeeper集群配置完毕。

8)Logstash部署
Logstash requires Java 7 or later. Use the official Oracle distribution or an open-source distribution such as OpenJDK.

Download and install the public signing key:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

Add the following in your /etc/yum.repos.d/ directory in a file with a .repo suffix, for example logstash.repo

[logstash-2.4]

name=Logstash repository for 2.4.x packages

baseurl=https://packages.elastic.co/logstash/2.4/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

And your repository is ready for use. You can install it with:

yum install logstash

9)Kibana部署
1、Download and install the public signing key:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2、Create a file named kibana.repo in the /etc/yum.repos.d/ directory with the following contents:

[kibana-4.6]

name=Kibana repository for 4.6.x packages

baseurl=https://packages.elastic.co/kibana/4.6/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

3、Install Kibana by running the following command:

yum install kibana

4、Configure Kibana to automatically start during bootup. If your distribution is using the System V version of init (check with ps -p 1), run the following command:

chkconfig --add kibana

5、If your distribution is using systemd, run the following commands instead:

sudo /bin/systemctl daemon-reload

sudo /bin/systemctl enable kibana.service

四、系统使用示例
按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana 数据顺序

1) Logstash 作为kafka生产者示例
2) Logstash index 消费kafka示例

kafka和ELK构建日志收集系统

背景:

最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项;所以最近将Redis换成了专业的消息信息发布订阅系统Kafka, Kafka的更多介绍大家可以看这里: 传送门 ,关于ELK的知识网上有很多的哦, 此篇博客主要是总结一下目前线上这个平台的实施步骤,ELK是怎么跟Kafka结合起来的。好吧,动手!

ELK架构拓扑:

然而我这里的整个日志收集平台就是这样的拓扑:

e6J3MfR.png!web

1,使用一台Nginx代理访问kibana的请求;

2,两台es组成es集群,并且在两台es上面都安装kibana;( 以下对elasticsearch简称es )

3,中间三台服务器就是我的kafka(zookeeper)集群啦; 上面写的 消费者/生产者 这是kafka(zookeeper)中的概念;

4,最后面的就是一大堆的生产服务器啦,上面使用的是logstash,当然除了logstash也可以使用其他的工具来收集你的应用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……

角色:

bMjyMbn.png!web

软件选用:

elasticsearch-1.7.3.tar.gz #这里需要说明一下,前几天使用了最新的elasticsearch2.0,java-1.8.0报错,目前未找到原因,故这里使用1.7.3版本
Logstash-2.0.0.tar.gz
kibana-4.1.2-linux-x64.tar.gz
以上软件都可以从官网下载:https://www.elastic.co/downloads
java-1.8.0,nginx采用yum安装

部署步骤:

1.ES集群安装配置;

2.Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志);

3.Kafka(zookeeper)集群配置;(Logstash写入数据到Kafka消息系统);

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

ES集群安装配置;

es1.example.com:

1.安装java-1.8.0以及依赖包

yuminstall -y epel-release
yuminstall -y java-1.8.0 gitwgetlrzsz

2.获取es软件包

wgethttps://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
tar -xfelasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

3.修改配置文件

[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml
32 cluster.name: es-cluster                        #组播的名称地址
40 node.name: "es-node1 "                          #节点名称,不能和其他节点重复
47 node.master: true                                #节点能否被选举为master
51 node.data: true                                  #节点是否存储数据
107 index.number_of_shards: 5                      #索引分片的个数
111 index.number_of_replicas: 1                    #分片的副本个数
145 path.conf: /usr/local/elasticsearch/config/    #配置文件的路径
149 path.data: /data/es/data                        #数据目录路径
159 path.work: /data/es/worker                      #工作目录路径
163 path.logs:  /usr/local/elasticsearch/logs/      #日志文件路径
167 path.plugins:  /data/es/plugins                #插件路径
184 bootstrap.mlockall: true                        #内存不向swap交换
232 http.enabled: true                              #启用http

4.创建相关目录

mkdir /data/es/{data,worker,plugins} -p

5.获取es服务管理脚本

[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git
[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install
DetectedRHELor Fedora:
InstallingtheElasticsearchdaemon..
[root@es1 ~]#
#这时就会在/etc/init.d/目录下安装上es的管理脚本啦
#修改其配置:
[root@es1 ~]#
set.default.ES_HOME=/usr/local/elasticsearch  #安装路径
set.default.ES_HEAP_SIZE=1024                  #jvm内存大小,根据实际环境调整即可

6.启动es ,并检查其服务是否正常

[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300
tcp        0      0 0.0.0.0:9200                0.0.0.0:*                  LISTEN      1684/java          
tcp        0      0 0.0.0.0:9300                0.0.0.0:*                  LISTEN      1684/java

访问http://192.168.2.18:9200/ 如果出现以下提示信息说明安装配置完成啦,

QZVV7jF.png!web

7.es1节点好啦,我们直接把目录复制到es2

[root@es1local]# scp -r elasticsearch-1.7.3  192.168.12.19:/usr/local/
[root@es2local]# ln -sv elasticsearch-1.7.3 elasticsearch
[root@es2local]# elasticsearch/bin/service/elasticsearch install
#es2只需要修改node.name即可,其他都与es1相同配置

8.安装es的管理插件

es官方提供一个用于管理es的插件,可清晰直观看到es集群的状态,以及对集群的操作管理,安装方法如下:

[root@es1local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

安装好之后,访问方式为: http://192.168.2.18:9200/_plugin/head,由于集群中现在暂时没有数据,所以显示为空,

Zzu6riU.png!web

此时,es集群的部署完成。

Logstash客户端安装配置;

在webserve1上面安装Logstassh

1.downloads  软件包 ,这里注意,Logstash是需要依赖java环境的,所以这里还是需要yum install -y java-1.8.0.

[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local
[root@webserver1 ~]# cd /usr/local/
[root@webserver1local]# ln -sv logstash-2.0.0 logstash
[root@webserver1local]# mkdir logs etc

2.提供logstash管理脚本,其中里面的配置路径可根据实际情况修改

#!/bin/bash
#chkconfig: 2345 55 24
#description: logstash service manager
#auto: Maoqiu Guo
FILE='/usr/local/logstash/etc/*.conf'    #logstash配置文件
LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config'  #指定logstash配置文件的命令
LOCK='/usr/local/logstash/locks'        #用锁文件配合服务启动与关闭
LOGLOG='--log /usr/local/logstash/logs/stdou.log'  #日志
START() {
 if [ -f $LOCK ];then
 echo -e "Logstash is already \033[32mrunning\033[0m, do nothing."
 else
 echo -e "Start logstash service.\033[32mdone\033[m"
 nohup ${LOGBIN} ${FILE} ${LOGLOG} &
 touch $LOCK
 fi
}
STOP() {
 if [ ! -f $LOCK ];then
 echo -e "Logstash is already stop, do nothing."
 else
 echo -e "Stop logstash serivce \033[32mdone\033[m"
 rm -rf $LOCK
 ps -ef | greplogstash | grep -v "grep" | awk '{print $2}' | xargskill -s 9 >/dev/null
 fi
}
STATUS() {
 psaux | greplogstash | grep -v "grep" >/dev/null
 if [ -f $LOCK ] && [ $? -eq 0 ]; then
 echo -e "Logstash is: \033[32mrunning\033[0m..."
 else
 echo -e "Logstash is: \033[31mstopped\033[0m..."
 fi
}
TEST(){
 ${LOGBIN} ${FILE} --configtest
}
case "$1" in
  start)
 START
 ;;
  stop)
 STOP
 ;;
  status)
 STATUS
 ;;
  restart)
 STOP
        sleep 2
        START
 ;;
  test)
 TEST
 ;;
  *)
 echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"
 ;;
esac

3.Logstash 向es集群写数据

(1)编写一个logstash配置文件

[root@webserver1etc]# cat logstash.conf
input {              #数据的输入从标准输入
  stdin {}  
}
output {            #数据的输出我们指向了es集群
  elasticsearch {
    hosts => ["192.168.2.18:9200","192.168.2.19:9200"]#es主机的ip及端口
  }
}
[root@webserver1etc]#

(2)检查配置文件是否有语法错

[root@webserver1etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
ConfigurationOK
[root@webserver1etc]#

(3)既然配置ok我们手动启动它,然后写点东西看能否写到es

Fn6rY3m.png!web

ok.上图已经看到logstash已经可以正常的工作啦.

4.下面演示一下如何收集系统日志

将之前的配置文件修改如下所示内容,然后启动logstash服务就可以在web页面中看到messages的日志写入es,并且创建了一条索引

[root@webserver1etc]# cat logstash.conf
input {#这里的输入使用的文件,即日志文件messsages
  file {
    path => "/var/log/messages"#这是日志文件的绝对路径
    start_position => "beginning"#这个表示从messages的第一行读取,即文件开始处
  }
}
output {#输出到es
  elasticsearch {
    hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
    index => "system-messages-%{+YYYY-MM}"#这里将按照这个索引格式来创建索引
  }
}
[root@webserver1etc]#

启动logstash后,我们来看head这个插件的web页面

6JFZj2m.png!web

ok,系统日志我们已经成功的收集,并且已经写入到es集群中,那上面的演示是logstash直接将日志写入到es集群中的,这种场合我觉得如果量不是很大的话直接像上面已将将输出output定义到es集群即可,如果量大的话需要加上消息队列来缓解es集群的压力。前面已经提到了我这边之前使用的是单台redis作为消息队列,但是redis不能作为list类型的集群,也就是redis单点的问题没法解决,所以这里我选用了kafka ;下面就在三台server上面安装kafka集群

Kafka集群安装配置;

在搭建kafka集群时,需要提前安装zookeeper集群,当然kafka已经自带zookeeper程序只需要解压并且安装配置就行了

kafka1上面的配置:

1.获取软件包.官网: http://kafka.apache.org

[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
[root@kafka1 ~]# cd /usr/local/
[root@kafka1local]# ln -sv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper集群,修改配置文件

[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie
dataDir=/data/zookeeper
clienrtPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.2=192.168.2.22:2888:3888
server.3=192.168.2.23:2888:3888
server.4=192.168.2.24:2888:3888
#说明:
tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
2888端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
3888端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

3.创建zookeeper所需要的目录

[root@kafka1 ~]# mkdir /data/zookeeper

4.在/data/zookeeper目录下创建myid文件,里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动的哦

[root@kafka1 ~]# echo 2 > /data/zookeeper/myid

以上就是zookeeper集群的配置,下面等我配置好kafka之后直接复制到其他两个节点即可

5.kafka配置

[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties
broker.id=2        # 唯一,填数字,本文中分别为2/3/4
prot=9092    # 这个broker监听的端口
host.name=192.168.2.22  # 唯一,填服务器IP
log.dir=/data/kafka-logs  #  该目录可以不用提前创建,在启动时自己会创建
zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181#这个就是zookeeper的ip及端口
num.partitions=16        # 需要配置较大 分片影响读写速度
log.dirs=/data/kafka-logs # 数据目录也要单独配置磁盘较大的地方
log.retention.hours=168  # 时间按需求保留过期时间 避免磁盘满

6.将kafka(zookeeper)的程序目录全部拷贝至其他两个节点

[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.修改两个借点的配置,注意这里除了以下两点不同外,都是相同的配置

1)zookeeper的配置
mkdir /data/zookeeper
echo "x" > /data/zookeeper/myid2)kafka的配置
broker.id=2
host.name=192.168.2.22

8.修改完毕配置之后我们就可以启动了,这里先要启动zookeeper集群,才能启动kafka

我们按照顺序来,kafka1 –> kafka2 –>kafka3

[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &   #zookeeper启动命令
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh                                                   #zookeeper停止的命令

注意,如果zookeeper有问题 nohup的日志文件会非常大,把磁盘占满,这个zookeeper服务可以通过自己些服务脚本来管理服务的启动与关闭。

后面两台执行相同操作,在启动过程当中会出现以下报错信息

[2015-11-13 19:18:04,225] WARNCannotopenchannelto 3 atelectionaddress /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connectionrefused
 atjava.net.PlainSocketImpl.socketConnect(Native Method)
 atjava.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 atjava.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 atjava.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
 atjava.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 atjava.net.Socket.connect(Socket.java:589)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
 atorg.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
 atorg.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,232] WARNCannotopenchannelto 4 atelectionaddress /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connectionrefused
 atjava.net.PlainSocketImpl.socketConnect(Native Method)
 atjava.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 atjava.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 atjava.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
 atjava.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 atjava.net.Socket.connect(Socket.java:589)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
 atorg.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
 atorg.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,233] INFONotificationtimeout: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)

由于zookeeper集群在启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以上面日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。

其他节点也可能会出现类似的情况,属于正常。

9.zookeeper服务检查

[root@kafka1~]#  netstat -nlpt | grep -E "2181|2888|3888"tcp        0      0192.168.2.24:3888          0.0.0.0:*                  LISTEN      1959/javatcp        0      00.0.0.0:2181                0.0.0.0:*                  LISTEN      1959/java[root@kafka2~]#  netstat -nlpt | grep -E "2181|2888|3888"tcp        0      0192.168.2.23:3888          0.0.0.0:*                  LISTEN      1723/javatcp        0      00.0.0.0:2181                0.0.0.0:*                  LISTEN      1723/java

[root@kafka3~]#  netstat -nlpt | grep -E "2181|2888|3888"

tcp        0      0192.168.2.24:3888          0.0.0.0:*                  LISTEN      950/java

tcp        0      00.0.0.0:2181                0.0.0.0:*                  LISTEN      950/java

tcp        0      0192.168.2.24:2888          0.0.0.0:*                  LISTEN      950/java

#可以看出,如果哪台是Leader,那么它就拥有2888这个端口

ok.  这时候zookeeper集群已经启动起来了,下面启动kafka,也是依次按照顺序启动

[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &   #kafka启动的命令
[root@kafka1 ~]#  /usr/local/kafka/bin/kafka-server-stop.sh                                                         #kafka停止的命令

注意,跟zookeeper服务一样,如果kafka有问题 nohup的日志文件会非常大,把磁盘占满,这个kafka服务同样可以通过自己些服务脚本来管理服务的启动与关闭。

此时三台上面的zookeeper及kafka都已经启动完毕,来检测以下吧

(1)建立一个主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer
#注意:factor大小不能超过broker数

(2)查看有哪些主题已经创建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181   #列出集群中所有的topic
summer  #已经创建成功

(3)查看summer这个主题的详情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer
Topic:summerPartitionCount:1 ReplicationFactor:3 Configs:
 Topic: summerPartition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3
#主题名称:summer
#Partition:只有一个,从0开始
#leaderid2broker
#Replicas 副本存在于broker id2,3,4的上面
#Isr:活跃状态的broker

(4)发送消息,这里使用的是生产者角色

[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer
This is a messages
welcometo kafka    

(5)接收消息,这里使用的是消费者角色

[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  192.168.2.24:2181 --topic summer --from-beginning
This is a messages
welcometo kafka

如果能够像上面一样能够接收到生产者发过来的消息,那说明基于kafka的zookeeper集群就成功啦。

10,下面我们将webserver1上面的logstash的输出改到kafka上面,将数据写入到kafka中

(1)修改webserver1上面的logstash配置,如下所示:各个参数可以到 官网 查询.

root@webserver1etc]# cat logstash.conf
input {            #这里的输入还是定义的是从日志文件输入
  file {
    type => "system-message" 
    path => "/var/log/messages"
    start_position => "beginning"
  }
}
output {
    #stdout { codec => rubydebug }   #这是标准输出到终端,可以用于调试看有没有输出,注意输出的方向可以有多个
    kafka {  #输出到kafka
      bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"  #他们就是生产者
      topic_id => "system-messages"  #这个将作为主题的名称,将会自动创建
      compression_type => "snappy"  #压缩类型
    }
}
[root@webserver1etc]#

(2)配置检测

[root@webserver1etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
ConfigurationOK
[root@webserver1etc]#

(2)启动Logstash,这里我直接在命令行执行即可

[root@webserver1etc]# /usr/local/logstash/bin/logstash -f logstash.conf

(3)验证数据是否写入到kafka,这里我们检查是否生成了一个叫system-messages的主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181
summer
system-messages  #可以看到这个主题已经生成了
#再看看这个主题的详情:
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages
Topic:system-messagesPartitionCount:16 ReplicationFactor:1 Configs:
 Topic: system-messagesPartition: 0 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 1 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 2 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 3 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 4 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 5 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 6 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 7 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 8 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 9 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 10 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 11 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 12 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 13 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 14 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 15 Leader: 2 Replicas: 2 Isr: 2
[root@kafka1 ~]#

可以看出,这个主题生成了16个分区,每个分区都有对应自己的Leader,但是我想要有10个分区,3个副本如何办?还是跟我们上面一样命令行来创建主题就行,当然对于logstash输出的我们也可以提前先定义主题,然后启动logstash 直接往定义好的主题写数据就行啦,命令如下:

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

好了,我们将logstash收集到的数据写入到了kafka中了,在实验过程中我使用while脚本测试了如果不断的往kafka写数据的同时停掉两个节点,数据写入没有任何问题。

那如何将数据从kafka中读取然后给我们的es集群呢?那下面我们在kafka集群上安装Logstash,安装步骤不再赘述;三台上面的logstash 的配置如下,作用是将kafka集群的数据读取然后转交给es集群,这里为了测试我让他新建一个索引文件,注意这里的输入日志还是messages,主题名称还是“system-messages”

[root@kafka1etc]# more logstash.conf
input {
    kafka {
        zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"  #消费者们
        topic_id => "system-messages"
        codec => plain
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
    }
}
output {
    elasticsearch {
      hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
      index => "test-system-messages-%{+YYYY-MM}"          #为了区分之前实验,我这里新生成的所以名字为“test-system-messages-%{+YYYY-MM}”
  }
  }

在三台kafka上面启动Logstash,注意我这里是在命令行启动的;

[root@kafka1etc]# pwd
/usr/local/logstash/etc
[root@kafka1etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka2etc]# pwd
/usr/local/logstash/etc
[root@kafka2etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka3etc]# pwd
/usr/local/logstash/etc
[root@kafka3etc]# /usr/local/logstash/bin/logstash -f logstash.conf

在webserver1上写入测试内容,即webserver1上面利用message这个文件来测试,我先将其清空,然后启动

[root@webserver1etc]# >/var/log/messages
[root@webserver1etc]# echo "我将通过kafka集群达到es集群哦^0^" >> /var/log/messages
#启动logstash,让其读取messages中的内容

下图为我在客户端写入到kafka集群的同时也将其输入到终端,这里写入了三条内容

uQRnIzQ.png!web

而下面三张图侧可以看出,三台Logstash 很平均的从kafka集群当中读取出来了日志内容

1myIVJ3.png!web

1NnIre2A.png!web

 

1qIbeEbj.png!web

再来看看我们的es管理界面

JVzeqa.png!web

ok ,看到了吧,

流程差不多就是下面 酱紫咯

YJv6Bv.png!web

由于篇幅较长,我将

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

放到下一篇博客。

本文博客转自:http://blog.sctux.com/?p=445

Kafka、Storm、HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实 时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处 理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系 统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合 Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方 式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

  • 直接使用Storm的Topology对数据进行实时分析处理
  • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合 Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软 件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

192.168.4.142   h1
192.168.4.143   h2
192.168.4.144   h3

在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar xvzf kafka_2.9.2-0.8.1.1.tgz
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
broker.id=0
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:

zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
cd /usr/local/zookeeper
bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:
create /kafka ''
这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
broker.id=1  # 在h1修改
broker.id=2  # 在h2修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的Topic,执行如下命令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:
Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1
上面Leader、Replicas、Isr的含义如下:
Partition: 分区
Leader   : 负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

 

 

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

192.168.4.142 h1
192.168.4.143 h2
192.168.4.144 h3

首先,在h1节点上,执行如下命令安装:
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
tar xvzf apache-storm-0.9.2-incubating.tar.gz
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

然后,修改配置文件conf/storm.yaml,内容如下所示:
storm.zookeeper.servers:
- "h1"
- "h2"
- "h3"
storm.zookeeper.port: 2181
#
nimbus.host: "h1"

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

storm.local.dir: "/tmp/storm"

将配置好的安装文件,分发到其他节点上:
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
bin/storm nimbus &
bin/storm supervisor &

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
bin/storm ui &

这样可以通过访问http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序 Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际 上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm- kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
package org.shirdrn.storm.examples;import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MyKafkaTopology {

public static class KafkaWordSplitter extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0);
LOG.info("RECV[kafka -> splitter] " + line);
String[] words = line.split("\\s+");
for(String word : words) {
LOG.info("EMIT[splitter -> counter] " + word);
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public static class WordCounter extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(WordCounter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
}

@Override
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
LOG.info("RECV[splitter -> counter] " + word + " : " + count);
AtomicInteger ai = this.counterMap.get(word);
if(ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
LOG.info("CHECK statistics map: " + this.counterMap);
}

@Override
public void cleanup() {
LOG.info("The final result:");
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
while(iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";

BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));

Config conf = new Config();

String name = MyKafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
// Nimbus host name passed from command line
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}

上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程 序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

然后,就可以提交我们开发的Topology程序了:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
spoutConf.forceFromStart = false;

该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读 取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的 Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
package org.shirdrn.storm.examples;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class StormToHDFSTopology {

public static class EventSpout extends BaseRichSpout {

private static final Log LOG = LogFactory.getLog(EventSpout.class);
private static final long serialVersionUID = 886149197481637894L;
private SpoutOutputCollector collector;
private Random rand;
private String[] records;

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
records = new String[] {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
};
}

@Override
public void nextTuple() {
Utils.sleep(1000);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
Date d = new Date(System.currentTimeMillis());
String minute = df.format(d);
String record = records[rand.nextInt(records.length)];
LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
collector.emit(new Values(minute, record));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("minute", "record"));
}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" : ");

// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// rotate files
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log");

HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("event-spout", new EventSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));

Config conf = new Config();

String name = StormToHDFSTopology.class.getSimpleName();
if (args != null && args.length > 0) {
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}

}

上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、 FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大 小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

 

 

整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在 Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消 费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
package org.shirdrn.storm.examples;

import java.util.Arrays;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DistributeWordTopology {

public static class KafkaWordToUpperCase extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -5207232012035109026L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("RECV[kafka -> splitter] " + line);
if(!line.isEmpty()) {
String upperLine = line.toUpperCase();
LOG.info("EMIT[splitter -> counter] " + upperLine);
collector.emit(input, new Values(upperLine, upperLine.length()));
}
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line", "len"));
}

}

public static class RealtimeBolt extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -4115132557403913367L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("REALTIME: " + line);
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {

// Configure Kafka
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;

// Configure HDFS bolt
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

// configure & build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");

// submit topology
Config conf = new Config();
String name = DistributeWordTopology.class.getSimpleName();
if (args != null && args.length > 0) {
String nimbus = args[0];
conf.put(Config.NIMBUS_HOST, nimbus);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}

}

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。