分类目录归档:mq

ELK+Kafka集群日志分析系统

一、 系统介绍 2

二、 版本说明 3

三、 服务部署 3

1) JDK部署 3

2) Elasticsearch集群部署及优化 3

3) Elasticsearch健康插件安装 13

4) Shield之elasticsearch安全插件 15

5)Zookeeper集群搭建 15

6)Kafka集群搭建 17

7)测试Kafka和Zookeeper集群连通性 19

8) Logstash部署 20

9) Kibana部署 20

四、 系统使用示例 22

1) Logstash 作为kafka生产者示例 22

2) Logstash index 消费kafka示例 23

一、系统介绍
随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析。下面来介绍一下架构

 

这是一个再常见不过的架构了:

(1)Kafka:接收用户日志的消息队列

(2)Logstash:做日志解析,统一成json输出给Elasticsearch

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

(5)Zookeeper: 状态管理,监控进程等服务

二、版本说明
本次系统为:Centos6.5 64位

Java版本为:1.8.0_74

Elasticsearch为:2.4.0

Logstash :2.4.0

Kibana:4.6.1

Shield:2.0+

Kafka:2.10-0.10.0.1

Zookeeper:3.4.9

相应的版本最好下载对应的插件。

三、服务部署
1) JDK部署
下载JDK包到/data目录下解压,并将变量导入/etc/profile末尾。

export JAVA_HOME=/usr/local/jdk

export PATH=$JAVA_HOME/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

2) Elasticsearch集群部署及优化
下载并安装es rpm包:

Rpm -ivh https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.4.0/elasticsearch-2.4.0.rpm

启动方式:

/etc/init.d/elasticsearch start|stop

Elasticsearch bin文件内存优化:

由于是rpm 方式安装将/usr/share/Elasticsearch/bin/Elasticsearch 加入如下参数:

ES_HEAP_SIZE=16g     #ES_HEAP_SIZE表示JVM参数的-Xms and -Xmx设置

MAX_OPEN_FILES=65535

Elasticsearch 配置文件优化以及说明:

我们需要配置如下:

cluster.name: es-ajb-cluster

node.name: es-node-1

node.master: true

node.data: true

index.number_of_shards: 8

index.number_of_replicas: 1

path.data: /data/es

path.logs: /data/eslogs

bootstrap.mlockall: true

network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["172.16.38.133", "172.16.38.134","172.16.38.135"]

以下为更详细yml配置文件参数解释:

##################### Elasticsearch Configuration Example ################

# 我只是挑些重要的配置选项进行注释,其实自带的已经有非常细致的英文注释了.有理解偏差的地方请以英文原版解释为准.

################################### Cluster#############################

# 代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的.

# es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。

# cluster.name可以确定你的集群名称,当你的elasticsearch集群在同一个网段中elasticsearch会自动的找到具有相同cluster.name的elasticsearch服务.

# 所以当同一个网段具有多个elasticsearch集群时cluster.name就成为同一个集群的标识.

#cluster.name: elasticsearch

#################################### Node ##############################

# 节点名称同理,可自动生成也可手动配置.

#node.name: "Franz Kafka"

# 允许一个节点是否可以成为一个master节点,es是默认集群中的第一台机器为master,如果这台机器停止就会重新选举master.

#node.master: true

# 允许该节点存储数据(默认开启)

#node.data: true

# 配置文件中给出了三种配置高性能集群拓扑结构的模式,如下:

# 1. 如果你想让节点从不选举为主节点,只用来存储数据,可作为负载器

# node.master: false

# node.data: true

#

# 2. 如果想让节点成为主节点,且不存储任何数据,并保有空闲资源,可作为协调器

# node.master: true

# node.data: false

#

# 3. 如果想让节点既不称为主节点,又不成为数据节点,那么可将他作为搜索器,从节点中获取数据,生成搜索结果等

# node.master: false

# node.data: false

# 监控集群状态有一下插件和API可以使用:

# Use the Cluster Health API [http://localhost:9200/_cluster/health], the

# Node Info API [http://localhost:9200/_nodes] or GUI tools

# such as <http://www.elasticsearch.org/overview/marvel/>,

# <http://github.com/karmi/elasticsearch-paramedic>,

# <http://github.com/lukas-vlcek/bigdesk> and

# <http://mobz.github.com/elasticsearch-head> to inspect the cluster state.

# A node can have generic attributes associated with it, which can later be used

# for customized shard allocation filtering, or allocation awareness. An attribute

# is a simple key value pair, similar to node.key: value, here is an example:

#

#node.rack: rack314

# By default, multiple nodes are allowed to start from the same installation location

# to disable it, set the following:

#node.max_local_storage_nodes: 1

#################################### Index #############################

# 设置索引的分片数,默认为5

#index.number_of_shards: 5

# 设置索引的副本数,默认为1:

#index.number_of_replicas: 1

# 配置文件中提到的最佳实践是,如果服务器够多,可以将分片提高,尽量将数据平均分布到大集群中去

# 同时,如果增加副本数量可以有效的提高搜索性能

# 需要注意的是,"number_of_shards" 是索引创建后一次生成的,后续不可更改设置

# "number_of_replicas" 是可以通过API去实时修改设置的

#################################### Paths ####################################

# 配置文件存储位置

#path.conf: /path/to/conf

# 数据存储位置(单个目录设置)

#path.data: /path/to/data

# 多个数据存储位置,有利于性能提升

#path.data: /path/to/data1,/path/to/data2

# 临时文件的路径

#path.work: /path/to/work

# 日志文件的路径

#path.logs: /path/to/logs

# 插件安装路径

#path.plugins: /path/to/plugins

#################################### Plugin ############################

# 设置插件作为启动条件,如果一下插件没有安装,则该节点服务不会启动

#plugin.mandatory: mapper-attachments,lang-groovy

################################### Memory ##############################

# 当JVM开始写入交换空间时(swapping)ElasticSearch性能会低下,你应该保证它不会写入交换空间

# 设置这个属性为true来锁定内存,同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过 `ulimit -l unlimited` 命令

#bootstrap.mlockall: true

# 确保 ES_MIN_MEM 和 ES_MAX_MEM 环境变量设置为相同的值,以及机器有足够的内存分配给Elasticsearch

# 注意:内存也不是越大越好,一般64位机器,最大分配内存别才超过32G

############################## Network And HTTP #######################

# 设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0

#network.bind_host: 192.168.0.1

# 设置其它节点和该节点交互的ip地址,如果不设置它会自动设置,值必须是个真实的ip地址

#network.publish_host: 192.168.0.1

# 同时设置bind_host和publish_host上面两个参数

#network.host: 192.168.0.1

# 设置节点间交互的tcp端口,默认是9300

#transport.tcp.port: 9300

# 设置是否压缩tcp传输时的数据,默认为false,不压缩

#transport.tcp.compress: true

# 设置对外服务的http端口,默认为9200

#http.port: 9200

# 设置请求内容的最大容量,默认100mb

#http.max_content_length: 100mb

# 使用http协议对外提供服务,默认为true,开启

#http.enabled: false

################################### Gateway #############################

# gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统

#gateway.type: local

# 下面的配置控制怎样以及何时启动一整个集群重启的初始化恢复过程

# (当使用shard gateway时,是为了尽可能的重用local data(本地数据))

# 一个集群中的N个节点启动后,才允许进行恢复处理

#gateway.recover_after_nodes: 1

# 设置初始化恢复过程的超时时间,超时时间从上一个配置中配置的N个节点启动后算起

#gateway.recover_after_time: 5m

# 设置这个集群中期望有多少个节点.一旦这N个节点启动(并且recover_after_nodes也符合),

# 立即开始恢复过程(不等待recover_after_time超时)

#gateway.expected_nodes: 2

############################# Recovery Throttling #######################

# 下面这些配置允许在初始化恢复,副本分配,再平衡,或者添加和删除节点时控制节点间的分片分配

# 设置一个节点的并行恢复数

# 1.初始化数据恢复时,并发恢复线程的个数,默认为4

#cluster.routing.allocation.node_initial_primaries_recoveries: 4

#

# 2.添加删除节点或负载均衡时并发恢复线程的个数,默认为2

#cluster.routing.allocation.node_concurrent_recoveries: 2

# 设置恢复时的吞吐量(例如:100mb,默认为0无限制.如果机器还有其他业务在跑的话还是限制一下的好)

#indices.recovery.max_bytes_per_sec: 20mb

# 设置来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5

#indices.recovery.concurrent_streams: 5

# 注意: 合理的设置以上参数能有效的提高集群节点的数据恢复以及初始化速度

################################## Discovery ##########################

# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点.默认为1,对于大的集群来说,可以设置大一点的值(2-4)

#discovery.zen.minimum_master_nodes: 1

# 探查的超时时间,默认3秒,提高一点以应对网络不好的时候,防止脑裂

#discovery.zen.ping.timeout: 3s

# For more information, see

# <http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-discovery-zen.html>

# 设置是否打开多播发现节点.默认是true.

# 当多播不可用或者集群跨网段的时候集群通信还是用单播吧

#discovery.zen.ping.multicast.enabled: false

# 这是一个集群中的主节点的初始列表,当节点(主节点或者数据节点)启动时使用这个列表进行探测

#discovery.zen.ping.unicast.hosts: ["host1", "host2:port"]

# Slow Log部分与GC log部分略,不过可以通过相关日志优化搜索查询速度

############## Memory(重点需要调优的部分) ################

# Cache部分:

# es有很多种方式来缓存其内部与索引有关的数据.其中包括filter cache

# filter cache部分:

# filter cache是用来缓存filters的结果的.默认的cache type是node type.node type的机制是所有的索引内部的分片共享filter cache.node type采用的方式是LRU方式.即:当缓存达到了某个临界值之后,es会将最近没有使用的数据清除出filter cache.使让新的数据进入es.

# 这个临界值的设置方法如下:indices.cache.filter.size 值类型:eg.:512mb 20%。默认的值是10%。

# out of memory错误避免过于频繁的查询时集群假死

# 1.设置es的缓存类型为Soft Reference,它的主要特点是据有较强的引用功能.只有当内存不够的时候,才进行回收这类内存,因此在内存足够的时候,它们通常不被回收.另外,这些引用对象还能保证在Java抛出OutOfMemory异常之前,被设置为null.它可以用于实现一些常用图片的缓存,实现Cache的功能,保证最大限度的使用内存而不引起OutOfMemory.在es的配置文件加上index.cache.field.type: soft即可.

# 2.设置es最大缓存数据条数和缓存失效时间,通过设置index.cache.field.max_size: 50000来把缓存field的最大值设置为50000,设置index.cache.field.expire: 10m把过期时间设置成10分钟.

#index.cache.field.max_size: 50000

#index.cache.field.expire: 10m

#index.cache.field.type: soft

# field data部分&&circuit breaker部分:

# 用于field data 缓存的内存数量,主要用于当使用排序,faceting操作时,elasticsearch会将一些热点数据加载到内存中来提供给客户端访问,但是这种缓存是比较珍贵的,所以对它进行合理的设置.

# 可以使用值:eg:50mb 或者 30%(节点 node heap内存量),默认是:unbounded

#indices.fielddata.cache.size: unbounded

# field的超时时间.默认是-1,可以设置的值类型: 5m

#indices.fielddata.cache.expire: -1

# circuit breaker部分:

# 断路器是elasticsearch为了防止内存溢出的一种操作,每一种circuit breaker都可以指定一个内存界限触发此操作,这种circuit breaker的设定有一个最高级别的设定:indices.breaker.total.limit 默认值是JVM heap的70%.当内存达到这个数量的时候会触发内存回收

# 另外还有两组子设置:

#indices.breaker.fielddata.limit:当系统发现fielddata的数量达到一定数量时会触发内存回收.默认值是JVM heap的70%

#indices.breaker.fielddata.overhead:在系统要加载fielddata时会进行预先估计,当系统发现要加载进内存的值超过limit * overhead时会进行进行内存回收.默认是1.03

#indices.breaker.request.limit:这种断路器是elasticsearch为了防止OOM(内存溢出),在每次请求数据时设定了一个固定的内存数量.默认值是40%

#indices.breaker.request.overhead:同上,也是elasticsearch在发送请求时设定的一个预估系数,用来防止内存溢出.默认值是1

# Translog部分:

# 每一个分片(shard)都有一个transaction log或者是与它有关的预写日志,(write log),在es进行索引(index)或者删除(delete)操作时会将没有提交的数据记录在translog之中,当进行flush 操作的时候会将tranlog中的数据发送给Lucene进行相关的操作.一次flush操作的发生基于如下的几个配置

#index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush.默认是 unlimited

#index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作.默认是512mb

#index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作.默认是30m

#index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作.es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s

#index.gateway.local.sync:多少时间进行一次的写磁盘操作,默认是5s

# 以上的translog配置都可以通过API进行动态的设置

集群部署

集群名称不修改,节点名称修改、将elasticsearch.yml 复制到其他节点,并替换其配置文件。并关闭自动发现,防止其他外来节点连入。

3) Elasticsearch健康插件安装
elasticsearch-head是一个elasticsearch的集群管理工具,它是完全由html5编写的独立网页程序,你可以通过插件把它集成到es。

插件安装方法1:

1.elasticsearch/bin/plugin -install mobz/elasticsearch-head

2.运行es

3.打开http://localhost:9200/_plugin/head/

插件安装方法2:

1.https://github.com/mobz/elasticsearch-head下载zip 解压

2.建立elasticsearch\plugins\head\_site文件

3.将解压后的elasticsearch-head-master文件夹下的文件copy到_site

4.运行es

5.打开http://localhost:9200/_plugin/head/

在地址栏输入es服务器的ip地址和端口点connect就可以连接到集群。下面是连接后的视图。这是主界面,在这里可以看到es集群的基本信息(如:节点情况,索引情况)。

通过head插件可以对索引执行api 操作。

 

4) Shield之elasticsearch安全插件
官方安装步骤以及验证

Shield 2.0+

Compatible with the latest versions of Elasticsearch and Kibana

Step 1: Install Shield into Elasticsearch

bin/plugin install license

bin/plugin install shield

Step 2: Start Elasticsearch  bin/elasticsearch

Step 3: Add an admin user  bin/shield/esusers useradd es_admin -r admin

Step 4: Test your authenticated user  curl -u es_admin -XGET 'http://localhost:9200/'

Step 5: Install Shield Into Kibana for full session support (Shield 2.2+)

Step 6: Dive into the Getting Started Guide

目前测试尚未通过,等待官方发布新版本

5)Zookeeper集群搭建
Kafka依赖Zookeeper管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安装 Zookeeper。自然,为了达到高可用的目的,Zookeeper自身也不能是单点,接下来就介绍如何搭建一个最小的Zookeeper集群(3个 zk节点)

此处选用Zookeeper的版本是3.4.9,此为Kafka0.10中推荐的Zookeeper版本。

首先解压

tar -xzvf zookeeper-3.4.9.tar.gz

进入zookeeper的conf目录,将zoo_sample.cfg复制一份,命名为zoo.cfg,此即为Zookeeper的配置文件

cp zoo_sample.cfg zoo.cfg

编辑zoo.cfg

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

dataDir=/data/zk/zk0/data

dataLogDir=/data/zk/zk0/logs

# the port at which the clients will connect

clientPort=2181

server.176 = 172.16.38.176:2888:3888

server.177 = 172.16.38.177:2888:3888

server.1771 = 172.16.38.177:2999:3999

autopurge.purgeInterval=1

dataDir和dataLogDir的路径需要在启动前创建好

clientPort为zookeeper的服务端口

server.0/1/2为zk集群中三个node的信息,定义格式为hostname:port1:port2,其中port1是node间通信使用的端口,port2是node选举使用的端口,需确保三台主机的这两个端口都是互通的

在另外两台主机上执行同样的操作,安装并配置zookeeper

分别在三台主机的dataDir路径下创建一个文件名为myid的文件,文件内容为该zk节点的编号。例如在第一台主机上建立的myid文件内容是0,第二台是1。

接下来,启动三台主机上的zookeeper服务:

bin/zkServer.sh start

3个节点都启动完成后,可依次执行如下命令查看集群状态:

bin/zkServer.sh status

命令输出如下:

Mode: leader 或 Mode: follower

3个节点中,应有1个leader和两个follower

验证zookeeper集群高可用性:

假设目前3个zk节点中,server0为leader,server1和server2为follower

我们停掉server0上的zookeeper服务:

bin/zkServer.sh stop

再到server1和server2上查看集群状态,会发现此时server1(也有可能是server2)为leader,另一个为follower。

再次启动server0的zookeeper服务,运行zkServer.sh status检查,发现新启动的server0也为follower

至此,zookeeper集群的安装和高可用性验证完成。

附:Zookeeper默认会将控制台信息输出到启动路径下的zookeeper.out中,显然在生产环境中我们不能允许Zookeeper这样做,通过如下方法,可以让Zookeeper输出按尺寸切分的日志文件:

修改conf/log4j.properties文件,将zookeeper.root.logger=INFO, CONSOLE改为

zookeeper.root.logger=INFO, ROLLINGFILE修改bin/zkEnv.sh文件,将

ZOO_LOG4J_PROP="INFO,CONSOLE"改为ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

然后重启zookeeper,就ok了

6)Kafka集群搭建
Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。

kafka 基本概念

Topic 主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费

Partition 每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。 消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费

Consumer 消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。

simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。

high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:

partition1 ---> A进程consumerid1

partition2 ---> A进程consumerid1

partition3 ---> A进程consumerid2

partition4 ---> B进程consumer1

partition5 ---> B进程consumer2

Group High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。

由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。

部署安装

此例中,我们会安装配置一个有两个Broker组成的Kafka集群,并在其上创建一个两个分区的Topic

本例中使用Kafka最新版本0.10.0

首先解压官网下载kafkatar包

编辑config/server.properties文件,下面列出关键的参数

#此Broker的ID,集群中每个Broker的ID不可相同

broker.id=0

#监听器,端口号与port一致即可

listeners=PLAINTEXT://:9092

#Broker监听的端口

port=19092

#Broker的Hostname,填主机IP即可

host.name=172.16.38.176

#向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)

advertised.host.name=172.16.38.176

advertised.port=9092

#进行IO的线程数,应大于主机磁盘数

num.io.threads=8

#消息文件存储的路径

log.dirs=/data/kafka-logs

#消息文件清理周期,即清理x小时前的消息记录

log.retention.hours=168

#每个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就行了

num.partitions=1

#Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可

zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182

将172.16.38.177 也按照上图配置文件中,只是修改brokerid即可。

7)测试Kafka和Zookeeper集群连通性
(1)建立一个主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer

#注意:factor大小不能超过broker数

(2)查看有哪些主题已经创建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181   #列出集群中所有的topic

summer  #已经创建成功

(3)查看summer这个主题的详情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer

Topic:summer PartitionCount:16 ReplicationFactor:2 Configs:

Topic: summer Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 5 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 7 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 9 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 11 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 13 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 15 Leader: 1 Replicas: 1,0 Isr: 0,1

#主题名称:summer

#Partition:16个,从0开始

#leader :id为0和1的broker

#Replicas 副本存在于broker id为0,1的上面

#Isr:活跃状态的broker

(4)发送消息,这里使用的是生产者角色

[root@kafka1 data]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.38.176:9092,172.16.38.177:9092 --topic test1

(5)接收消息,这里使用的是消费者角色

[root@kafka2 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --topic test1

(6)验证的效果,生产者和消费者

 

(7)删除一个消息主题

[root@kafka2data]#/usr/local/kafka/bin/kafka-topics.sh--zookeeper 172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --delete --topic test1

Topic test1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

这样kafka和zookeeper集群配置完毕。

8)Logstash部署
Logstash requires Java 7 or later. Use the official Oracle distribution or an open-source distribution such as OpenJDK.

Download and install the public signing key:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

Add the following in your /etc/yum.repos.d/ directory in a file with a .repo suffix, for example logstash.repo

[logstash-2.4]

name=Logstash repository for 2.4.x packages

baseurl=https://packages.elastic.co/logstash/2.4/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

And your repository is ready for use. You can install it with:

yum install logstash

9)Kibana部署
1、Download and install the public signing key:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2、Create a file named kibana.repo in the /etc/yum.repos.d/ directory with the following contents:

[kibana-4.6]

name=Kibana repository for 4.6.x packages

baseurl=https://packages.elastic.co/kibana/4.6/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

3、Install Kibana by running the following command:

yum install kibana

4、Configure Kibana to automatically start during bootup. If your distribution is using the System V version of init (check with ps -p 1), run the following command:

chkconfig --add kibana

5、If your distribution is using systemd, run the following commands instead:

sudo /bin/systemctl daemon-reload

sudo /bin/systemctl enable kibana.service

四、系统使用示例
按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana 数据顺序

1) Logstash 作为kafka生产者示例
2) Logstash index 消费kafka示例

kafka和ELK构建日志收集系统

背景:

最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项;所以最近将Redis换成了专业的消息信息发布订阅系统Kafka, Kafka的更多介绍大家可以看这里: 传送门 ,关于ELK的知识网上有很多的哦, 此篇博客主要是总结一下目前线上这个平台的实施步骤,ELK是怎么跟Kafka结合起来的。好吧,动手!

ELK架构拓扑:

然而我这里的整个日志收集平台就是这样的拓扑:

e6J3MfR.png!web

1,使用一台Nginx代理访问kibana的请求;

2,两台es组成es集群,并且在两台es上面都安装kibana;( 以下对elasticsearch简称es )

3,中间三台服务器就是我的kafka(zookeeper)集群啦; 上面写的 消费者/生产者 这是kafka(zookeeper)中的概念;

4,最后面的就是一大堆的生产服务器啦,上面使用的是logstash,当然除了logstash也可以使用其他的工具来收集你的应用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……

角色:

bMjyMbn.png!web

软件选用:

elasticsearch-1.7.3.tar.gz #这里需要说明一下,前几天使用了最新的elasticsearch2.0,java-1.8.0报错,目前未找到原因,故这里使用1.7.3版本
Logstash-2.0.0.tar.gz
kibana-4.1.2-linux-x64.tar.gz
以上软件都可以从官网下载:https://www.elastic.co/downloads
java-1.8.0,nginx采用yum安装

部署步骤:

1.ES集群安装配置;

2.Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志);

3.Kafka(zookeeper)集群配置;(Logstash写入数据到Kafka消息系统);

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

ES集群安装配置;

es1.example.com:

1.安装java-1.8.0以及依赖包

yuminstall -y epel-release
yuminstall -y java-1.8.0 gitwgetlrzsz

2.获取es软件包

wgethttps://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
tar -xfelasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

3.修改配置文件

[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml
32 cluster.name: es-cluster                        #组播的名称地址
40 node.name: "es-node1 "                          #节点名称,不能和其他节点重复
47 node.master: true                                #节点能否被选举为master
51 node.data: true                                  #节点是否存储数据
107 index.number_of_shards: 5                      #索引分片的个数
111 index.number_of_replicas: 1                    #分片的副本个数
145 path.conf: /usr/local/elasticsearch/config/    #配置文件的路径
149 path.data: /data/es/data                        #数据目录路径
159 path.work: /data/es/worker                      #工作目录路径
163 path.logs:  /usr/local/elasticsearch/logs/      #日志文件路径
167 path.plugins:  /data/es/plugins                #插件路径
184 bootstrap.mlockall: true                        #内存不向swap交换
232 http.enabled: true                              #启用http

4.创建相关目录

mkdir /data/es/{data,worker,plugins} -p

5.获取es服务管理脚本

[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git
[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install
DetectedRHELor Fedora:
InstallingtheElasticsearchdaemon..
[root@es1 ~]#
#这时就会在/etc/init.d/目录下安装上es的管理脚本啦
#修改其配置:
[root@es1 ~]#
set.default.ES_HOME=/usr/local/elasticsearch  #安装路径
set.default.ES_HEAP_SIZE=1024                  #jvm内存大小,根据实际环境调整即可

6.启动es ,并检查其服务是否正常

[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300
tcp        0      0 0.0.0.0:9200                0.0.0.0:*                  LISTEN      1684/java          
tcp        0      0 0.0.0.0:9300                0.0.0.0:*                  LISTEN      1684/java

访问http://192.168.2.18:9200/ 如果出现以下提示信息说明安装配置完成啦,

QZVV7jF.png!web

7.es1节点好啦,我们直接把目录复制到es2

[root@es1local]# scp -r elasticsearch-1.7.3  192.168.12.19:/usr/local/
[root@es2local]# ln -sv elasticsearch-1.7.3 elasticsearch
[root@es2local]# elasticsearch/bin/service/elasticsearch install
#es2只需要修改node.name即可,其他都与es1相同配置

8.安装es的管理插件

es官方提供一个用于管理es的插件,可清晰直观看到es集群的状态,以及对集群的操作管理,安装方法如下:

[root@es1local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

安装好之后,访问方式为: http://192.168.2.18:9200/_plugin/head,由于集群中现在暂时没有数据,所以显示为空,

Zzu6riU.png!web

此时,es集群的部署完成。

Logstash客户端安装配置;

在webserve1上面安装Logstassh

1.downloads  软件包 ,这里注意,Logstash是需要依赖java环境的,所以这里还是需要yum install -y java-1.8.0.

[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local
[root@webserver1 ~]# cd /usr/local/
[root@webserver1local]# ln -sv logstash-2.0.0 logstash
[root@webserver1local]# mkdir logs etc

2.提供logstash管理脚本,其中里面的配置路径可根据实际情况修改

#!/bin/bash
#chkconfig: 2345 55 24
#description: logstash service manager
#auto: Maoqiu Guo
FILE='/usr/local/logstash/etc/*.conf'    #logstash配置文件
LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config'  #指定logstash配置文件的命令
LOCK='/usr/local/logstash/locks'        #用锁文件配合服务启动与关闭
LOGLOG='--log /usr/local/logstash/logs/stdou.log'  #日志
START() {
 if [ -f $LOCK ];then
 echo -e "Logstash is already \033[32mrunning\033[0m, do nothing."
 else
 echo -e "Start logstash service.\033[32mdone\033[m"
 nohup ${LOGBIN} ${FILE} ${LOGLOG} &
 touch $LOCK
 fi
}
STOP() {
 if [ ! -f $LOCK ];then
 echo -e "Logstash is already stop, do nothing."
 else
 echo -e "Stop logstash serivce \033[32mdone\033[m"
 rm -rf $LOCK
 ps -ef | greplogstash | grep -v "grep" | awk '{print $2}' | xargskill -s 9 >/dev/null
 fi
}
STATUS() {
 psaux | greplogstash | grep -v "grep" >/dev/null
 if [ -f $LOCK ] && [ $? -eq 0 ]; then
 echo -e "Logstash is: \033[32mrunning\033[0m..."
 else
 echo -e "Logstash is: \033[31mstopped\033[0m..."
 fi
}
TEST(){
 ${LOGBIN} ${FILE} --configtest
}
case "$1" in
  start)
 START
 ;;
  stop)
 STOP
 ;;
  status)
 STATUS
 ;;
  restart)
 STOP
        sleep 2
        START
 ;;
  test)
 TEST
 ;;
  *)
 echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"
 ;;
esac

3.Logstash 向es集群写数据

(1)编写一个logstash配置文件

[root@webserver1etc]# cat logstash.conf
input {              #数据的输入从标准输入
  stdin {}  
}
output {            #数据的输出我们指向了es集群
  elasticsearch {
    hosts => ["192.168.2.18:9200","192.168.2.19:9200"]#es主机的ip及端口
  }
}
[root@webserver1etc]#

(2)检查配置文件是否有语法错

[root@webserver1etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
ConfigurationOK
[root@webserver1etc]#

(3)既然配置ok我们手动启动它,然后写点东西看能否写到es

Fn6rY3m.png!web

ok.上图已经看到logstash已经可以正常的工作啦.

4.下面演示一下如何收集系统日志

将之前的配置文件修改如下所示内容,然后启动logstash服务就可以在web页面中看到messages的日志写入es,并且创建了一条索引

[root@webserver1etc]# cat logstash.conf
input {#这里的输入使用的文件,即日志文件messsages
  file {
    path => "/var/log/messages"#这是日志文件的绝对路径
    start_position => "beginning"#这个表示从messages的第一行读取,即文件开始处
  }
}
output {#输出到es
  elasticsearch {
    hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
    index => "system-messages-%{+YYYY-MM}"#这里将按照这个索引格式来创建索引
  }
}
[root@webserver1etc]#

启动logstash后,我们来看head这个插件的web页面

6JFZj2m.png!web

ok,系统日志我们已经成功的收集,并且已经写入到es集群中,那上面的演示是logstash直接将日志写入到es集群中的,这种场合我觉得如果量不是很大的话直接像上面已将将输出output定义到es集群即可,如果量大的话需要加上消息队列来缓解es集群的压力。前面已经提到了我这边之前使用的是单台redis作为消息队列,但是redis不能作为list类型的集群,也就是redis单点的问题没法解决,所以这里我选用了kafka ;下面就在三台server上面安装kafka集群

Kafka集群安装配置;

在搭建kafka集群时,需要提前安装zookeeper集群,当然kafka已经自带zookeeper程序只需要解压并且安装配置就行了

kafka1上面的配置:

1.获取软件包.官网: http://kafka.apache.org

[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
[root@kafka1 ~]# cd /usr/local/
[root@kafka1local]# ln -sv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper集群,修改配置文件

[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie
dataDir=/data/zookeeper
clienrtPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.2=192.168.2.22:2888:3888
server.3=192.168.2.23:2888:3888
server.4=192.168.2.24:2888:3888
#说明:
tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
2888端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
3888端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

3.创建zookeeper所需要的目录

[root@kafka1 ~]# mkdir /data/zookeeper

4.在/data/zookeeper目录下创建myid文件,里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动的哦

[root@kafka1 ~]# echo 2 > /data/zookeeper/myid

以上就是zookeeper集群的配置,下面等我配置好kafka之后直接复制到其他两个节点即可

5.kafka配置

[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties
broker.id=2        # 唯一,填数字,本文中分别为2/3/4
prot=9092    # 这个broker监听的端口
host.name=192.168.2.22  # 唯一,填服务器IP
log.dir=/data/kafka-logs  #  该目录可以不用提前创建,在启动时自己会创建
zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181#这个就是zookeeper的ip及端口
num.partitions=16        # 需要配置较大 分片影响读写速度
log.dirs=/data/kafka-logs # 数据目录也要单独配置磁盘较大的地方
log.retention.hours=168  # 时间按需求保留过期时间 避免磁盘满

6.将kafka(zookeeper)的程序目录全部拷贝至其他两个节点

[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.修改两个借点的配置,注意这里除了以下两点不同外,都是相同的配置

1)zookeeper的配置
mkdir /data/zookeeper
echo "x" > /data/zookeeper/myid2)kafka的配置
broker.id=2
host.name=192.168.2.22

8.修改完毕配置之后我们就可以启动了,这里先要启动zookeeper集群,才能启动kafka

我们按照顺序来,kafka1 –> kafka2 –>kafka3

[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &   #zookeeper启动命令
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh                                                   #zookeeper停止的命令

注意,如果zookeeper有问题 nohup的日志文件会非常大,把磁盘占满,这个zookeeper服务可以通过自己些服务脚本来管理服务的启动与关闭。

后面两台执行相同操作,在启动过程当中会出现以下报错信息

[2015-11-13 19:18:04,225] WARNCannotopenchannelto 3 atelectionaddress /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connectionrefused
 atjava.net.PlainSocketImpl.socketConnect(Native Method)
 atjava.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 atjava.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 atjava.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
 atjava.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 atjava.net.Socket.connect(Socket.java:589)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
 atorg.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
 atorg.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,232] WARNCannotopenchannelto 4 atelectionaddress /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connectionrefused
 atjava.net.PlainSocketImpl.socketConnect(Native Method)
 atjava.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 atjava.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 atjava.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
 atjava.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 atjava.net.Socket.connect(Socket.java:589)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
 atorg.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
 atorg.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
 atorg.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,233] INFONotificationtimeout: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)

由于zookeeper集群在启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以上面日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。

其他节点也可能会出现类似的情况,属于正常。

9.zookeeper服务检查

[root@kafka1~]#  netstat -nlpt | grep -E "2181|2888|3888"tcp        0      0192.168.2.24:3888          0.0.0.0:*                  LISTEN      1959/javatcp        0      00.0.0.0:2181                0.0.0.0:*                  LISTEN      1959/java[root@kafka2~]#  netstat -nlpt | grep -E "2181|2888|3888"tcp        0      0192.168.2.23:3888          0.0.0.0:*                  LISTEN      1723/javatcp        0      00.0.0.0:2181                0.0.0.0:*                  LISTEN      1723/java

[root@kafka3~]#  netstat -nlpt | grep -E "2181|2888|3888"

tcp        0      0192.168.2.24:3888          0.0.0.0:*                  LISTEN      950/java

tcp        0      00.0.0.0:2181                0.0.0.0:*                  LISTEN      950/java

tcp        0      0192.168.2.24:2888          0.0.0.0:*                  LISTEN      950/java

#可以看出,如果哪台是Leader,那么它就拥有2888这个端口

ok.  这时候zookeeper集群已经启动起来了,下面启动kafka,也是依次按照顺序启动

[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &   #kafka启动的命令
[root@kafka1 ~]#  /usr/local/kafka/bin/kafka-server-stop.sh                                                         #kafka停止的命令

注意,跟zookeeper服务一样,如果kafka有问题 nohup的日志文件会非常大,把磁盘占满,这个kafka服务同样可以通过自己些服务脚本来管理服务的启动与关闭。

此时三台上面的zookeeper及kafka都已经启动完毕,来检测以下吧

(1)建立一个主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer
#注意:factor大小不能超过broker数

(2)查看有哪些主题已经创建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181   #列出集群中所有的topic
summer  #已经创建成功

(3)查看summer这个主题的详情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer
Topic:summerPartitionCount:1 ReplicationFactor:3 Configs:
 Topic: summerPartition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3
#主题名称:summer
#Partition:只有一个,从0开始
#leaderid2broker
#Replicas 副本存在于broker id2,3,4的上面
#Isr:活跃状态的broker

(4)发送消息,这里使用的是生产者角色

[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer
This is a messages
welcometo kafka    

(5)接收消息,这里使用的是消费者角色

[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  192.168.2.24:2181 --topic summer --from-beginning
This is a messages
welcometo kafka

如果能够像上面一样能够接收到生产者发过来的消息,那说明基于kafka的zookeeper集群就成功啦。

10,下面我们将webserver1上面的logstash的输出改到kafka上面,将数据写入到kafka中

(1)修改webserver1上面的logstash配置,如下所示:各个参数可以到 官网 查询.

root@webserver1etc]# cat logstash.conf
input {            #这里的输入还是定义的是从日志文件输入
  file {
    type => "system-message" 
    path => "/var/log/messages"
    start_position => "beginning"
  }
}
output {
    #stdout { codec => rubydebug }   #这是标准输出到终端,可以用于调试看有没有输出,注意输出的方向可以有多个
    kafka {  #输出到kafka
      bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"  #他们就是生产者
      topic_id => "system-messages"  #这个将作为主题的名称,将会自动创建
      compression_type => "snappy"  #压缩类型
    }
}
[root@webserver1etc]#

(2)配置检测

[root@webserver1etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
ConfigurationOK
[root@webserver1etc]#

(2)启动Logstash,这里我直接在命令行执行即可

[root@webserver1etc]# /usr/local/logstash/bin/logstash -f logstash.conf

(3)验证数据是否写入到kafka,这里我们检查是否生成了一个叫system-messages的主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181
summer
system-messages  #可以看到这个主题已经生成了
#再看看这个主题的详情:
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages
Topic:system-messagesPartitionCount:16 ReplicationFactor:1 Configs:
 Topic: system-messagesPartition: 0 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 1 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 2 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 3 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 4 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 5 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 6 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 7 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 8 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 9 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 10 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 11 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 12 Leader: 2 Replicas: 2 Isr: 2
 Topic: system-messagesPartition: 13 Leader: 3 Replicas: 3 Isr: 3
 Topic: system-messagesPartition: 14 Leader: 4 Replicas: 4 Isr: 4
 Topic: system-messagesPartition: 15 Leader: 2 Replicas: 2 Isr: 2
[root@kafka1 ~]#

可以看出,这个主题生成了16个分区,每个分区都有对应自己的Leader,但是我想要有10个分区,3个副本如何办?还是跟我们上面一样命令行来创建主题就行,当然对于logstash输出的我们也可以提前先定义主题,然后启动logstash 直接往定义好的主题写数据就行啦,命令如下:

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

好了,我们将logstash收集到的数据写入到了kafka中了,在实验过程中我使用while脚本测试了如果不断的往kafka写数据的同时停掉两个节点,数据写入没有任何问题。

那如何将数据从kafka中读取然后给我们的es集群呢?那下面我们在kafka集群上安装Logstash,安装步骤不再赘述;三台上面的logstash 的配置如下,作用是将kafka集群的数据读取然后转交给es集群,这里为了测试我让他新建一个索引文件,注意这里的输入日志还是messages,主题名称还是“system-messages”

[root@kafka1etc]# more logstash.conf
input {
    kafka {
        zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"  #消费者们
        topic_id => "system-messages"
        codec => plain
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
    }
}
output {
    elasticsearch {
      hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
      index => "test-system-messages-%{+YYYY-MM}"          #为了区分之前实验,我这里新生成的所以名字为“test-system-messages-%{+YYYY-MM}”
  }
  }

在三台kafka上面启动Logstash,注意我这里是在命令行启动的;

[root@kafka1etc]# pwd
/usr/local/logstash/etc
[root@kafka1etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka2etc]# pwd
/usr/local/logstash/etc
[root@kafka2etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka3etc]# pwd
/usr/local/logstash/etc
[root@kafka3etc]# /usr/local/logstash/bin/logstash -f logstash.conf

在webserver1上写入测试内容,即webserver1上面利用message这个文件来测试,我先将其清空,然后启动

[root@webserver1etc]# >/var/log/messages
[root@webserver1etc]# echo "我将通过kafka集群达到es集群哦^0^" >> /var/log/messages
#启动logstash,让其读取messages中的内容

下图为我在客户端写入到kafka集群的同时也将其输入到终端,这里写入了三条内容

uQRnIzQ.png!web

而下面三张图侧可以看出,三台Logstash 很平均的从kafka集群当中读取出来了日志内容

1myIVJ3.png!web

1NnIre2A.png!web

 

1qIbeEbj.png!web

再来看看我们的es管理界面

JVzeqa.png!web

ok ,看到了吧,

流程差不多就是下面 酱紫咯

YJv6Bv.png!web

由于篇幅较长,我将

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

放到下一篇博客。

本文博客转自:http://blog.sctux.com/?p=445

Kafka、Storm、HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实 时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处 理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系 统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合 Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方 式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

  • 直接使用Storm的Topology对数据进行实时分析处理
  • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合 Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软 件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

192.168.4.142   h1
192.168.4.143   h2
192.168.4.144   h3

在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar xvzf kafka_2.9.2-0.8.1.1.tgz
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
broker.id=0
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:

zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
cd /usr/local/zookeeper
bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:
create /kafka ''
这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
broker.id=1  # 在h1修改
broker.id=2  # 在h2修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的Topic,执行如下命令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:
Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1
上面Leader、Replicas、Isr的含义如下:
Partition: 分区
Leader   : 负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

 

 

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

192.168.4.142 h1
192.168.4.143 h2
192.168.4.144 h3

首先,在h1节点上,执行如下命令安装:
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
tar xvzf apache-storm-0.9.2-incubating.tar.gz
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

然后,修改配置文件conf/storm.yaml,内容如下所示:
storm.zookeeper.servers:
- "h1"
- "h2"
- "h3"
storm.zookeeper.port: 2181
#
nimbus.host: "h1"

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

storm.local.dir: "/tmp/storm"

将配置好的安装文件,分发到其他节点上:
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
bin/storm nimbus &
bin/storm supervisor &

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
bin/storm ui &

这样可以通过访问http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序 Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际 上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm- kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
package org.shirdrn.storm.examples;import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MyKafkaTopology {

public static class KafkaWordSplitter extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0);
LOG.info("RECV[kafka -> splitter] " + line);
String[] words = line.split("\\s+");
for(String word : words) {
LOG.info("EMIT[splitter -> counter] " + word);
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public static class WordCounter extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(WordCounter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
}

@Override
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
LOG.info("RECV[splitter -> counter] " + word + " : " + count);
AtomicInteger ai = this.counterMap.get(word);
if(ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
LOG.info("CHECK statistics map: " + this.counterMap);
}

@Override
public void cleanup() {
LOG.info("The final result:");
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
while(iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";

BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));

Config conf = new Config();

String name = MyKafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
// Nimbus host name passed from command line
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}

上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程 序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

然后,就可以提交我们开发的Topology程序了:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
spoutConf.forceFromStart = false;

该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读 取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的 Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
package org.shirdrn.storm.examples;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class StormToHDFSTopology {

public static class EventSpout extends BaseRichSpout {

private static final Log LOG = LogFactory.getLog(EventSpout.class);
private static final long serialVersionUID = 886149197481637894L;
private SpoutOutputCollector collector;
private Random rand;
private String[] records;

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
records = new String[] {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
};
}

@Override
public void nextTuple() {
Utils.sleep(1000);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
Date d = new Date(System.currentTimeMillis());
String minute = df.format(d);
String record = records[rand.nextInt(records.length)];
LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
collector.emit(new Values(minute, record));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("minute", "record"));
}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" : ");

// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// rotate files
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log");

HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("event-spout", new EventSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));

Config conf = new Config();

String name = StormToHDFSTopology.class.getSimpleName();
if (args != null && args.length > 0) {
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}

}

上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、 FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大 小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

 

 

整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在 Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消 费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
package org.shirdrn.storm.examples;

import java.util.Arrays;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DistributeWordTopology {

public static class KafkaWordToUpperCase extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -5207232012035109026L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("RECV[kafka -> splitter] " + line);
if(!line.isEmpty()) {
String upperLine = line.toUpperCase();
LOG.info("EMIT[splitter -> counter] " + upperLine);
collector.emit(input, new Values(upperLine, upperLine.length()));
}
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line", "len"));
}

}

public static class RealtimeBolt extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -4115132557403913367L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("REALTIME: " + line);
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {

// Configure Kafka
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;

// Configure HDFS bolt
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

// configure & build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");

// submit topology
Config conf = new Config();
String name = DistributeWordTopology.class.getSimpleName();
if (args != null && args.length > 0) {
String nimbus = args[0];
conf.put(Config.NIMBUS_HOST, nimbus);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}

}

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

开发Kafka应用

来源:http://blog.csdn.net/suifeng3051/article/details/37602025

一、整体看一下Kafka

我们知道,Kafka系统有三大组件:Producer、Consumer、broker 。
producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume)。

二、开发一个Producer应用
     Producers用来生产消息并把产生的消息推送到Kafka的Broker。Producers可以是各种应用,比如web应用,服务器端应用,代理应用以及log系统等等。当然,Producers现在有各种语言的实现比如Java、C、Python等。
     我们先看一下Producer在Kafka中的角色:
        
2.1.kafka Producer 的 API
Kafka中和producer相关的API有三个类
  • Producer:最主要的类,用来创建和推送消息
  • KeyedMessage:定义要发送的消息对象,比如定义发送给哪个topic,partition key和发送的内容等。
  • ProducerConfig:配置Producer,比如定义要连接的brokers、partition class、serializer class、partition key等
2.2下面我们就写一个最简单的Producer:产生一条消息并推送给broker
  1. package bonree.producer;
  2. import java.util.Properties;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. /*******************************************************************************
  7.  * BidPlanStructForm.java Created on 2014-7-8
  8.  * Author: <a href=mailto:wanghouda@126.com>houda</a>
  9.  * @Title: SimpleProducer.java
  10.  * @Package bonree.producer
  11.  * Description:
  12.  * Version: 1.0
  13.  ******************************************************************************/
  14. public class SimpleProducer {
  15.     private static Producer<Integer,String> producer;
  16.     private final Properties props=new Properties();
  17.     public SimpleProducer(){
  18.         //定义连接的broker list
  19.         props.put("metadata.broker.list""192.168.4.31:9092");
  20.         //定义序列化类(Java对象传输前要序列化)
  21.         props.put("serializer.class""kafka.serializer.StringEncoder");
  22.         producer = new Producer<Integer, String>(new ProducerConfig(props));
  23.     }
  24.     public static void main(String[] args) {
  25.         SimpleProducer sp=new SimpleProducer();
  26.         //定义topic
  27.         String topic="mytopic";
  28.         //定义要发送给topic的消息
  29.         String messageStr = "send a message to broker ";
  30.         //构建消息对象
  31.         KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
  32.         //推送消息到broker
  33.         producer.send(data);
  34.         producer.close();
  35.     }
  36. }
三、开发一个consumer应用
     Consumer是用来消费Producer产生的消息的,当然一个Consumer可以是各种应用,如可以是一个实时的分析系统,也可以是一个数据仓 库或者是一个基于发布订阅模式的解决方案等。Consumer端同样有多种语言的实现,如Java、C、Python等。
     我们看一下Consumer在Kafka中的角色:
   
3.1.kafka Producer 的 API
     Kafka和Producer稍微有些不同,它提供了两种类型的API
  • high-level consumer API:提供了对底层API的抽象,使用起来比较简单
  • simple consumer API:允许重写底层API的实现,提供了更多的控制权,当然使用起来也复杂一些
由于是第一个应用,我们这部分使用high-level API,它的特点每消费一个message自动移动offset值到下一个message,关于offset在后面的部分会单独介绍。与Producer类似,和Consumer相关的有三个主要的类:
  • KafkaStream:这里面返回的就是Producer生产的消息
  • ConsumerConfig:定义要连接zookeeper的一些配置信息(Kafka通过zookeeper均衡压力,具体请查阅见面几篇文章),比如定义zookeeper的URL、group id、连接zookeeper过期时间等。
  • ConsumerConnector:负责和zookeeper进行连接等工作
3.2下面我们就写一个最简单的Consumer:从broker中消费一个消息
  1. package bonree.consumer;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.Consumer;
  7. import kafka.consumer.ConsumerConfig;
  8. import kafka.consumer.ConsumerIterator;
  9. import kafka.consumer.KafkaStream;
  10. import kafka.javaapi.consumer.ConsumerConnector;
  11. /*******************************************************************************
  12.  * Created on 2014-7-8 Author: <a
  13.  * href=mailto:wanghouda@126.com>houda</a>
  14.  * @Title: SimpleHLConsumer.java
  15.  * @Package bonree.consumer Description: Version: 1.0
  16.  ******************************************************************************/
  17. public class SimpleHLConsumer {
  18.     private final ConsumerConnector consumer;
  19.     private final String topic;
  20.     public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
  21.         Properties props = new Properties();
  22.         //定义连接zookeeper信息
  23.         props.put("zookeeper.connect", zookeeper);
  24.         //定义Consumer所有的groupID,关于groupID,后面会继续介绍
  25.         props.put("group.id", groupId);
  26.         props.put("zookeeper.session.timeout.ms""500");
  27.         props.put("zookeeper.sync.time.ms""250");
  28.         props.put("auto.commit.interval.ms""1000");
  29.         consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
  30.         this.topic = topic;
  31.     }
  32.     public void testConsumer() {
  33.         Map<String, Integer> topicCount = new HashMap<String, Integer>();
  34.         //定义订阅topic数量
  35.         topicCount.put(topic, new Integer(1));
  36.         //返回的是所有topic的Map
  37.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
  38.         //取出我们要需要的topic中的消息流
  39.         List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
  40.         for (final KafkaStream stream : streams) {
  41.             ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
  42.             while (consumerIte.hasNext())
  43.                 System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
  44.         }
  45.         if (consumer != null)
  46.             consumer.shutdown();
  47.     }
  48.     public static void main(String[] args) {
  49.         String topic = "mytopic";
  50.         SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181""testgroup", topic);
  51.         simpleHLConsumer.testConsumer();
  52.     }
  53. }
四、运行查看结果
先启动服务器端相关进程:
  • 运行zookeeper:[root@localhost kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
  • 运行Kafkabroker:[root@localhost kafka-0.8]# bin/kafka-server-start.sh config/server.properties
再运行我们写的应用
  • 运行刚才写的SimpleHLConsumer 类的main函数,等待生产者生产消息
  • 运行SimpleProducer的main函数,生产消息并push到broker
结果:运行完SimpleProducer后在SimpleHLConsumer的控制台即可看到生产者生产的消息:“send a message to broker”。

artemis安装

一.Apache Artemis介绍

Apache Artemis是apache的一个新的消息系统, 这个消息系统是来源于 redhat的 “异步消息系统 HornetQ

HornetQ的相关资料有如下:

http://wenku.baidu.com/view/2f19b1557fd5360cba1adbd9.html?from=search

1. 关于Apache Artemis

http://activemq.apache.org/artemis/  项目的注意

http://activemq.apache.org/artemis/download.html  下载页面

另外 可以参考 HornetQ的相关资料, 目前同 HornetQ还有很大的相似性

选择他的Artemis的考虑是, 他是中等成熟的 消息系统, 功能够用, 代码少, 阅读方便, 便于理解, 这样有了问题才好 进行相关的处理工作。

而 activemq 代码太庞大了, 功能太多, 想彻底弄清楚要费非常大力气

而apache Apollo  是基于acitvemq的基础上, 做了内核调整, 因此不确定代码情况, 以及成熟稳定情况, 可以后续再研究

2. Apache activemq, apache Apollo, apache Artemis 以及 apache kafka的区别

如下面文章

http://stackoverflow.com/questions/27666943/activemq-vs-apollo-vs-kafka

36 down vote accepted

Apache ActiveMQ is a great workhorse full of features and nice stuff. It's not the fastest MQ software around but fast enough for most use cases. Among features are flexible clustring, fail-over, integrations with different application servers, security etc.

Apache Apollo is an attempt to write a new core for ActiveMQ to cope with a large amount of clients and messages. It does not have all nice and convenient feature of ActiveMQ but scales a lot better. Apache Apollo is a really fast MQ implementation when you give it a large multi-core server and thousands of concurrent connections. It has a nice, simple UI, but is not a "one-size-fits-all" solution.

It seems that there is an attempt ongoing to merge a number of ActiveMQ features with HornetQ under the name ActiveMQ Artemis. HornetQ has JMS2.0 support, so my humble guess is that it's likely to appear in ActiveMQ 6.x.

JIRAGithub

Kafka is a different beast. It's a very simple message broker intended to scale persistent publish subscribe (topics) as fast as possible over multiple servers. For small-medium sized deployments, Kafka is probably not the best option. It also has it's way to do things to achieve the high throughput, so you have to trade a lot in terms of flexibility to get high distributed throughput. If you are new to the area of MQ and brokers, I guess Kafka is overkill. On the other hand - if you have a decent sized server cluster and wonder how to push as many messages as possible through it - give Kafka a spin!

shareimprove this answer

edited May 14 at 16:20

answered Dec 27 '14 at 18:04

Petter Nordlander

13.3k41952

Thanks! I guess that Apache Apollo is the way to go then in my case. – Martin Dec 27 '14 at 20:06

2

ActiveMQ Artemis 1.0.0 has been released: activemq.apache.org/artemis – mjn Jun 29 at 17:54

If I (partially) understand relation ActiveMQ to Apollo (newer implementation based on previous experience etc), what is general vision (goal, motivation) of Artemis? – Jacek Cz Sep 20 at 11:14

1

Apollo failed, in a way, to become the new non-blocking core of AMQ as there was a lack of Scala programmers dedicated to that effort. The vision with Artemis, as I see it - and I am not an official of ActiveMQ - is to do what Apollo failed to do. The chances are probably greater with a battle tested product (HornetQ), written in a main stream language (Java) than writing from scratch. Both aim(ed) for the same target - non blocking ActiveMQ to support new demands from IoT and mobility applications, among other things – Petter Nordlander Sep 21 at 7:27

二.Apache Artemis 安装

1. 下载Apache Artemis

http://activemq.apache.org/artemis/  项目的注意

http://activemq.apache.org/artemis/download.html  下载页面

2. 解压缩

下载后, 解压缩到目录中, 要确保没有目录名称没有空格

解压缩下载好的 windows下的zip文件

3. 打开解压缩目录中的文档 参照文档进行相关操作

Getting Started

Note (Windows users): The broker currently does not support spaces in path names. For this reason the broker should be placed in a directory with no spaces in it's absolute path.

对于windows用户目前这个消息系统不支持在路径上有空格, 因此必须使用没有空格的绝对路径

Note (Windows users): Examples below use the shell script `artemis` for use with linux, Windows users should use the `artemis.cmd` script with the same parameters.

对于本文中要使用的`artemis`命了 在windows下要使用`artemis.cmd`

4. 创建服务器Creating a broker

To create a broker, navigate to the distribution 'bin/' directory and run:

为了创建消息服务器, 请进入到安装目录的bin目录下, 然后输入下面命令

$ ./artemis create $directory
Where $directory is the folder that you'd like the broker to be created. The create process will input for any required property not specified.

这个$directory文件夹是你想把broker创建的目录, 创建过程会要求你输入一些必要的没有指定的系统属性

Example:

--user: is mandatory with this configuration:

Please provide the default username:

admin

例如, --user, 是必须的, 请提供一个默认的用户名

--password: is mandatory with this configuration:

Please provide the default password:

--password 是必须的配置, 请输入默认密码

--allow-anonymous: is mandatory with this configuration:

Allow anonymous access? (Y/N):

y

--allow-anonymous 是必须的, 用来指定是否允许匿名用户存取消息系统。

For a full list of available options for the create process you may use: $ ./artemis help create    查看详细的帮助列表

NAME

        artemis create - creates a new broker instance

SYNOPSIS

        artemis create [--allow-anonymous]

                [--cluster-password <clusterPassword>] [--cluster-user <clusterUser>]

    [--clustered] [--data <data>] [--encoding <encoding>] [--force]

        [--home <home>] [--host <host>] [--java-options <javaOptions>]

            [--password <password>] [--port-offset <portOffset>] [--replicated]

                [--role <role>] [--shared-store] [--silent] [--user <user>] [--]

                    <directory>

     ...

You can now start the broker by executing:

   "D:\mq\broker1\bin\artemis" run

Or you can setup the broker as Windows service and run it in the background:

   "D:\mq\broker1\bin\artemis-service.exe" install

   "D:\mq\broker1\bin\artemis-service.exe" start

   To stop the windows service:

      "D:\mq\broker1\bin\artemis-service.exe" stop

   To uninstall the windows service

      "D:\mq\broker1\bin\artemis-service.exe" uninstall

5. 进入应用程序目录

bin中是程序运行的必要文件

Etc目录是各种配置

Data存储数据

Log日志

6. 启动应用程序

Starting the Broker

Once the broker has been created, use the artemis (or artemis.cmd on windows) script found under the bin directory of the newly created broker to manage the life cycle of the broker. To run the Apache ActiveMQ Artemis broker with the default configuration, run the command shown below from the "bin" directory of the created broker.

新的broker服务器被创建完成后, 就可用新创建的broker目录下的bin目录中的artemis.cmd 命令(linux下artemis) 来启动或者管理broker的各项工作

$ ./artemis run   用默认配置启动服务器

To specify a broker configuration file:

$ ./artemis run --config scheme:location    指定一个配置文件进行启动

e.g.

$ ./artemis run --config xml:/home/artemis/bootstrap.xml

It is possible to configure run time paramters in the artemis.conf (artemis.conf.bat for windows) file under the "bin" directory of the broker directory.

7. 停止服务器

Stopping the Broker

To stop the broker please use the artemis script from the broker's bin directory:

$ ./artemis stop