ELK+Kafka集群日志分析系统

一、 系统介绍 2

二、 版本说明 3

三、 服务部署 3

1) JDK部署 3

2) Elasticsearch集群部署及优化 3

3) Elasticsearch健康插件安装 13

4) Shield之elasticsearch安全插件 15

5)Zookeeper集群搭建 15

6)Kafka集群搭建 17

7)测试Kafka和Zookeeper集群连通性 19

8) Logstash部署 20

9) Kibana部署 20

四、 系统使用示例 22

1) Logstash 作为kafka生产者示例 22

2) Logstash index 消费kafka示例 23

一、系统介绍
随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析。下面来介绍一下架构

 

这是一个再常见不过的架构了:

(1)Kafka:接收用户日志的消息队列

(2)Logstash:做日志解析,统一成json输出给Elasticsearch

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

(5)Zookeeper: 状态管理,监控进程等服务

二、版本说明
本次系统为:Centos6.5 64位

Java版本为:1.8.0_74

Elasticsearch为:2.4.0

Logstash :2.4.0

Kibana:4.6.1

Shield:2.0+

Kafka:2.10-0.10.0.1

Zookeeper:3.4.9

相应的版本最好下载对应的插件。

三、服务部署
1) JDK部署
下载JDK包到/data目录下解压,并将变量导入/etc/profile末尾。

export JAVA_HOME=/usr/local/jdk

export PATH=$JAVA_HOME/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

2) Elasticsearch集群部署及优化
下载并安装es rpm包:

Rpm -ivh https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.4.0/elasticsearch-2.4.0.rpm

启动方式:

/etc/init.d/elasticsearch start|stop

Elasticsearch bin文件内存优化:

由于是rpm 方式安装将/usr/share/Elasticsearch/bin/Elasticsearch 加入如下参数:

ES_HEAP_SIZE=16g     #ES_HEAP_SIZE表示JVM参数的-Xms and -Xmx设置

MAX_OPEN_FILES=65535

Elasticsearch 配置文件优化以及说明:

我们需要配置如下:

cluster.name: es-ajb-cluster

node.name: es-node-1

node.master: true

node.data: true

index.number_of_shards: 8

index.number_of_replicas: 1

path.data: /data/es

path.logs: /data/eslogs

bootstrap.mlockall: true

network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["172.16.38.133", "172.16.38.134","172.16.38.135"]

以下为更详细yml配置文件参数解释:

##################### Elasticsearch Configuration Example ################

# 我只是挑些重要的配置选项进行注释,其实自带的已经有非常细致的英文注释了.有理解偏差的地方请以英文原版解释为准.

################################### Cluster#############################

# 代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的.

# es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。

# cluster.name可以确定你的集群名称,当你的elasticsearch集群在同一个网段中elasticsearch会自动的找到具有相同cluster.name的elasticsearch服务.

# 所以当同一个网段具有多个elasticsearch集群时cluster.name就成为同一个集群的标识.

#cluster.name: elasticsearch

#################################### Node ##############################

# 节点名称同理,可自动生成也可手动配置.

#node.name: "Franz Kafka"

# 允许一个节点是否可以成为一个master节点,es是默认集群中的第一台机器为master,如果这台机器停止就会重新选举master.

#node.master: true

# 允许该节点存储数据(默认开启)

#node.data: true

# 配置文件中给出了三种配置高性能集群拓扑结构的模式,如下:

# 1. 如果你想让节点从不选举为主节点,只用来存储数据,可作为负载器

# node.master: false

# node.data: true

#

# 2. 如果想让节点成为主节点,且不存储任何数据,并保有空闲资源,可作为协调器

# node.master: true

# node.data: false

#

# 3. 如果想让节点既不称为主节点,又不成为数据节点,那么可将他作为搜索器,从节点中获取数据,生成搜索结果等

# node.master: false

# node.data: false

# 监控集群状态有一下插件和API可以使用:

# Use the Cluster Health API [http://localhost:9200/_cluster/health], the

# Node Info API [http://localhost:9200/_nodes] or GUI tools

# such as <http://www.elasticsearch.org/overview/marvel/>,

# <http://github.com/karmi/elasticsearch-paramedic>,

# <http://github.com/lukas-vlcek/bigdesk> and

# <http://mobz.github.com/elasticsearch-head> to inspect the cluster state.

# A node can have generic attributes associated with it, which can later be used

# for customized shard allocation filtering, or allocation awareness. An attribute

# is a simple key value pair, similar to node.key: value, here is an example:

#

#node.rack: rack314

# By default, multiple nodes are allowed to start from the same installation location

# to disable it, set the following:

#node.max_local_storage_nodes: 1

#################################### Index #############################

# 设置索引的分片数,默认为5

#index.number_of_shards: 5

# 设置索引的副本数,默认为1:

#index.number_of_replicas: 1

# 配置文件中提到的最佳实践是,如果服务器够多,可以将分片提高,尽量将数据平均分布到大集群中去

# 同时,如果增加副本数量可以有效的提高搜索性能

# 需要注意的是,"number_of_shards" 是索引创建后一次生成的,后续不可更改设置

# "number_of_replicas" 是可以通过API去实时修改设置的

#################################### Paths ####################################

# 配置文件存储位置

#path.conf: /path/to/conf

# 数据存储位置(单个目录设置)

#path.data: /path/to/data

# 多个数据存储位置,有利于性能提升

#path.data: /path/to/data1,/path/to/data2

# 临时文件的路径

#path.work: /path/to/work

# 日志文件的路径

#path.logs: /path/to/logs

# 插件安装路径

#path.plugins: /path/to/plugins

#################################### Plugin ############################

# 设置插件作为启动条件,如果一下插件没有安装,则该节点服务不会启动

#plugin.mandatory: mapper-attachments,lang-groovy

################################### Memory ##############################

# 当JVM开始写入交换空间时(swapping)ElasticSearch性能会低下,你应该保证它不会写入交换空间

# 设置这个属性为true来锁定内存,同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过 `ulimit -l unlimited` 命令

#bootstrap.mlockall: true

# 确保 ES_MIN_MEM 和 ES_MAX_MEM 环境变量设置为相同的值,以及机器有足够的内存分配给Elasticsearch

# 注意:内存也不是越大越好,一般64位机器,最大分配内存别才超过32G

############################## Network And HTTP #######################

# 设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0

#network.bind_host: 192.168.0.1

# 设置其它节点和该节点交互的ip地址,如果不设置它会自动设置,值必须是个真实的ip地址

#network.publish_host: 192.168.0.1

# 同时设置bind_host和publish_host上面两个参数

#network.host: 192.168.0.1

# 设置节点间交互的tcp端口,默认是9300

#transport.tcp.port: 9300

# 设置是否压缩tcp传输时的数据,默认为false,不压缩

#transport.tcp.compress: true

# 设置对外服务的http端口,默认为9200

#http.port: 9200

# 设置请求内容的最大容量,默认100mb

#http.max_content_length: 100mb

# 使用http协议对外提供服务,默认为true,开启

#http.enabled: false

################################### Gateway #############################

# gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统

#gateway.type: local

# 下面的配置控制怎样以及何时启动一整个集群重启的初始化恢复过程

# (当使用shard gateway时,是为了尽可能的重用local data(本地数据))

# 一个集群中的N个节点启动后,才允许进行恢复处理

#gateway.recover_after_nodes: 1

# 设置初始化恢复过程的超时时间,超时时间从上一个配置中配置的N个节点启动后算起

#gateway.recover_after_time: 5m

# 设置这个集群中期望有多少个节点.一旦这N个节点启动(并且recover_after_nodes也符合),

# 立即开始恢复过程(不等待recover_after_time超时)

#gateway.expected_nodes: 2

############################# Recovery Throttling #######################

# 下面这些配置允许在初始化恢复,副本分配,再平衡,或者添加和删除节点时控制节点间的分片分配

# 设置一个节点的并行恢复数

# 1.初始化数据恢复时,并发恢复线程的个数,默认为4

#cluster.routing.allocation.node_initial_primaries_recoveries: 4

#

# 2.添加删除节点或负载均衡时并发恢复线程的个数,默认为2

#cluster.routing.allocation.node_concurrent_recoveries: 2

# 设置恢复时的吞吐量(例如:100mb,默认为0无限制.如果机器还有其他业务在跑的话还是限制一下的好)

#indices.recovery.max_bytes_per_sec: 20mb

# 设置来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5

#indices.recovery.concurrent_streams: 5

# 注意: 合理的设置以上参数能有效的提高集群节点的数据恢复以及初始化速度

################################## Discovery ##########################

# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点.默认为1,对于大的集群来说,可以设置大一点的值(2-4)

#discovery.zen.minimum_master_nodes: 1

# 探查的超时时间,默认3秒,提高一点以应对网络不好的时候,防止脑裂

#discovery.zen.ping.timeout: 3s

# For more information, see

# <http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-discovery-zen.html>

# 设置是否打开多播发现节点.默认是true.

# 当多播不可用或者集群跨网段的时候集群通信还是用单播吧

#discovery.zen.ping.multicast.enabled: false

# 这是一个集群中的主节点的初始列表,当节点(主节点或者数据节点)启动时使用这个列表进行探测

#discovery.zen.ping.unicast.hosts: ["host1", "host2:port"]

# Slow Log部分与GC log部分略,不过可以通过相关日志优化搜索查询速度

############## Memory(重点需要调优的部分) ################

# Cache部分:

# es有很多种方式来缓存其内部与索引有关的数据.其中包括filter cache

# filter cache部分:

# filter cache是用来缓存filters的结果的.默认的cache type是node type.node type的机制是所有的索引内部的分片共享filter cache.node type采用的方式是LRU方式.即:当缓存达到了某个临界值之后,es会将最近没有使用的数据清除出filter cache.使让新的数据进入es.

# 这个临界值的设置方法如下:indices.cache.filter.size 值类型:eg.:512mb 20%。默认的值是10%。

# out of memory错误避免过于频繁的查询时集群假死

# 1.设置es的缓存类型为Soft Reference,它的主要特点是据有较强的引用功能.只有当内存不够的时候,才进行回收这类内存,因此在内存足够的时候,它们通常不被回收.另外,这些引用对象还能保证在Java抛出OutOfMemory异常之前,被设置为null.它可以用于实现一些常用图片的缓存,实现Cache的功能,保证最大限度的使用内存而不引起OutOfMemory.在es的配置文件加上index.cache.field.type: soft即可.

# 2.设置es最大缓存数据条数和缓存失效时间,通过设置index.cache.field.max_size: 50000来把缓存field的最大值设置为50000,设置index.cache.field.expire: 10m把过期时间设置成10分钟.

#index.cache.field.max_size: 50000

#index.cache.field.expire: 10m

#index.cache.field.type: soft

# field data部分&&circuit breaker部分:

# 用于field data 缓存的内存数量,主要用于当使用排序,faceting操作时,elasticsearch会将一些热点数据加载到内存中来提供给客户端访问,但是这种缓存是比较珍贵的,所以对它进行合理的设置.

# 可以使用值:eg:50mb 或者 30%(节点 node heap内存量),默认是:unbounded

#indices.fielddata.cache.size: unbounded

# field的超时时间.默认是-1,可以设置的值类型: 5m

#indices.fielddata.cache.expire: -1

# circuit breaker部分:

# 断路器是elasticsearch为了防止内存溢出的一种操作,每一种circuit breaker都可以指定一个内存界限触发此操作,这种circuit breaker的设定有一个最高级别的设定:indices.breaker.total.limit 默认值是JVM heap的70%.当内存达到这个数量的时候会触发内存回收

# 另外还有两组子设置:

#indices.breaker.fielddata.limit:当系统发现fielddata的数量达到一定数量时会触发内存回收.默认值是JVM heap的70%

#indices.breaker.fielddata.overhead:在系统要加载fielddata时会进行预先估计,当系统发现要加载进内存的值超过limit * overhead时会进行进行内存回收.默认是1.03

#indices.breaker.request.limit:这种断路器是elasticsearch为了防止OOM(内存溢出),在每次请求数据时设定了一个固定的内存数量.默认值是40%

#indices.breaker.request.overhead:同上,也是elasticsearch在发送请求时设定的一个预估系数,用来防止内存溢出.默认值是1

# Translog部分:

# 每一个分片(shard)都有一个transaction log或者是与它有关的预写日志,(write log),在es进行索引(index)或者删除(delete)操作时会将没有提交的数据记录在translog之中,当进行flush 操作的时候会将tranlog中的数据发送给Lucene进行相关的操作.一次flush操作的发生基于如下的几个配置

#index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush.默认是 unlimited

#index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作.默认是512mb

#index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作.默认是30m

#index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作.es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s

#index.gateway.local.sync:多少时间进行一次的写磁盘操作,默认是5s

# 以上的translog配置都可以通过API进行动态的设置

集群部署

集群名称不修改,节点名称修改、将elasticsearch.yml 复制到其他节点,并替换其配置文件。并关闭自动发现,防止其他外来节点连入。

3) Elasticsearch健康插件安装
elasticsearch-head是一个elasticsearch的集群管理工具,它是完全由html5编写的独立网页程序,你可以通过插件把它集成到es。

插件安装方法1:

1.elasticsearch/bin/plugin -install mobz/elasticsearch-head

2.运行es

3.打开http://localhost:9200/_plugin/head/

插件安装方法2:

1.https://github.com/mobz/elasticsearch-head下载zip 解压

2.建立elasticsearch\plugins\head\_site文件

3.将解压后的elasticsearch-head-master文件夹下的文件copy到_site

4.运行es

5.打开http://localhost:9200/_plugin/head/

在地址栏输入es服务器的ip地址和端口点connect就可以连接到集群。下面是连接后的视图。这是主界面,在这里可以看到es集群的基本信息(如:节点情况,索引情况)。

通过head插件可以对索引执行api 操作。

 

4) Shield之elasticsearch安全插件
官方安装步骤以及验证

Shield 2.0+

Compatible with the latest versions of Elasticsearch and Kibana

Step 1: Install Shield into Elasticsearch

bin/plugin install license

bin/plugin install shield

Step 2: Start Elasticsearch  bin/elasticsearch

Step 3: Add an admin user  bin/shield/esusers useradd es_admin -r admin

Step 4: Test your authenticated user  curl -u es_admin -XGET 'http://localhost:9200/'

Step 5: Install Shield Into Kibana for full session support (Shield 2.2+)

Step 6: Dive into the Getting Started Guide

目前测试尚未通过,等待官方发布新版本

5)Zookeeper集群搭建
Kafka依赖Zookeeper管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安装 Zookeeper。自然,为了达到高可用的目的,Zookeeper自身也不能是单点,接下来就介绍如何搭建一个最小的Zookeeper集群(3个 zk节点)

此处选用Zookeeper的版本是3.4.9,此为Kafka0.10中推荐的Zookeeper版本。

首先解压

tar -xzvf zookeeper-3.4.9.tar.gz

进入zookeeper的conf目录,将zoo_sample.cfg复制一份,命名为zoo.cfg,此即为Zookeeper的配置文件

cp zoo_sample.cfg zoo.cfg

编辑zoo.cfg

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

dataDir=/data/zk/zk0/data

dataLogDir=/data/zk/zk0/logs

# the port at which the clients will connect

clientPort=2181

server.176 = 172.16.38.176:2888:3888

server.177 = 172.16.38.177:2888:3888

server.1771 = 172.16.38.177:2999:3999

autopurge.purgeInterval=1

dataDir和dataLogDir的路径需要在启动前创建好

clientPort为zookeeper的服务端口

server.0/1/2为zk集群中三个node的信息,定义格式为hostname:port1:port2,其中port1是node间通信使用的端口,port2是node选举使用的端口,需确保三台主机的这两个端口都是互通的

在另外两台主机上执行同样的操作,安装并配置zookeeper

分别在三台主机的dataDir路径下创建一个文件名为myid的文件,文件内容为该zk节点的编号。例如在第一台主机上建立的myid文件内容是0,第二台是1。

接下来,启动三台主机上的zookeeper服务:

bin/zkServer.sh start

3个节点都启动完成后,可依次执行如下命令查看集群状态:

bin/zkServer.sh status

命令输出如下:

Mode: leader 或 Mode: follower

3个节点中,应有1个leader和两个follower

验证zookeeper集群高可用性:

假设目前3个zk节点中,server0为leader,server1和server2为follower

我们停掉server0上的zookeeper服务:

bin/zkServer.sh stop

再到server1和server2上查看集群状态,会发现此时server1(也有可能是server2)为leader,另一个为follower。

再次启动server0的zookeeper服务,运行zkServer.sh status检查,发现新启动的server0也为follower

至此,zookeeper集群的安装和高可用性验证完成。

附:Zookeeper默认会将控制台信息输出到启动路径下的zookeeper.out中,显然在生产环境中我们不能允许Zookeeper这样做,通过如下方法,可以让Zookeeper输出按尺寸切分的日志文件:

修改conf/log4j.properties文件,将zookeeper.root.logger=INFO, CONSOLE改为

zookeeper.root.logger=INFO, ROLLINGFILE修改bin/zkEnv.sh文件,将

ZOO_LOG4J_PROP="INFO,CONSOLE"改为ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

然后重启zookeeper,就ok了

6)Kafka集群搭建
Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。

kafka 基本概念

Topic 主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费

Partition 每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。 消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费

Consumer 消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。

simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。

high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:

partition1 ---> A进程consumerid1

partition2 ---> A进程consumerid1

partition3 ---> A进程consumerid2

partition4 ---> B进程consumer1

partition5 ---> B进程consumer2

Group High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。

由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。

部署安装

此例中,我们会安装配置一个有两个Broker组成的Kafka集群,并在其上创建一个两个分区的Topic

本例中使用Kafka最新版本0.10.0

首先解压官网下载kafkatar包

编辑config/server.properties文件,下面列出关键的参数

#此Broker的ID,集群中每个Broker的ID不可相同

broker.id=0

#监听器,端口号与port一致即可

listeners=PLAINTEXT://:9092

#Broker监听的端口

port=19092

#Broker的Hostname,填主机IP即可

host.name=172.16.38.176

#向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)

advertised.host.name=172.16.38.176

advertised.port=9092

#进行IO的线程数,应大于主机磁盘数

num.io.threads=8

#消息文件存储的路径

log.dirs=/data/kafka-logs

#消息文件清理周期,即清理x小时前的消息记录

log.retention.hours=168

#每个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就行了

num.partitions=1

#Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可

zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182

将172.16.38.177 也按照上图配置文件中,只是修改brokerid即可。

7)测试Kafka和Zookeeper集群连通性
(1)建立一个主题

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer

#注意:factor大小不能超过broker数

(2)查看有哪些主题已经创建

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181   #列出集群中所有的topic

summer  #已经创建成功

(3)查看summer这个主题的详情

[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer

Topic:summer PartitionCount:16 ReplicationFactor:2 Configs:

Topic: summer Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 5 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 7 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 9 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 11 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 13 Leader: 1 Replicas: 1,0 Isr: 0,1

Topic: summer Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1

Topic: summer Partition: 15 Leader: 1 Replicas: 1,0 Isr: 0,1

#主题名称:summer

#Partition:16个,从0开始

#leader :id为0和1的broker

#Replicas 副本存在于broker id为0,1的上面

#Isr:活跃状态的broker

(4)发送消息,这里使用的是生产者角色

[root@kafka1 data]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.38.176:9092,172.16.38.177:9092 --topic test1

(5)接收消息,这里使用的是消费者角色

[root@kafka2 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --topic test1

(6)验证的效果,生产者和消费者

 

(7)删除一个消息主题

[root@kafka2data]#/usr/local/kafka/bin/kafka-topics.sh--zookeeper 172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182 --delete --topic test1

Topic test1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

这样kafka和zookeeper集群配置完毕。

8)Logstash部署
Logstash requires Java 7 or later. Use the official Oracle distribution or an open-source distribution such as OpenJDK.

Download and install the public signing key:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

Add the following in your /etc/yum.repos.d/ directory in a file with a .repo suffix, for example logstash.repo

[logstash-2.4]

name=Logstash repository for 2.4.x packages

baseurl=https://packages.elastic.co/logstash/2.4/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

And your repository is ready for use. You can install it with:

yum install logstash

9)Kibana部署
1、Download and install the public signing key:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2、Create a file named kibana.repo in the /etc/yum.repos.d/ directory with the following contents:

[kibana-4.6]

name=Kibana repository for 4.6.x packages

baseurl=https://packages.elastic.co/kibana/4.6/centos

gpgcheck=1

gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch

enabled=1

3、Install Kibana by running the following command:

yum install kibana

4、Configure Kibana to automatically start during bootup. If your distribution is using the System V version of init (check with ps -p 1), run the following command:

chkconfig --add kibana

5、If your distribution is using systemd, run the following commands instead:

sudo /bin/systemctl daemon-reload

sudo /bin/systemctl enable kibana.service

四、系统使用示例
按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana 数据顺序

1) Logstash 作为kafka生产者示例
2) Logstash index 消费kafka示例

发表评论