月度归档:2015年12月

flume中sink到hdfs示例

一。 什么flume

     Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
现在这个项目是 apache的一个顶级项目,地址如下:http://flume.apache.org/
原文信息:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Agent component diagram

二。参见用法

1. Setting multi-agent flow(设置多流)
Two agents communicating over Avro RPC

In order to flow the data across multiple agents or hops, the sink of the previous agent and source of the current hop need to be avro type with the sink pointing to the hostname (or IP address) and port of the source.

2. Consolidation(合并)

A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.

A fan-in flow using Avro RPC to consolidate events in one place

This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

3. Multiplexing the flow(复用流

Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.

A fan-out flow using a (multiplexing) channel selector

The above example shows a source from agent “foo” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called “txnType” is set to “customer”, then it should go to channel1 and channel3, if it’s “vendor” then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file.

三。 用法举例

1. Flume 收集Nginx日志到Hdfs Tail-to-hdfs sink

nginx,access.log日志约8000条/s,每100w条数据约253M,需要2min

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure spooldir source1
#agent1.sources.source1.type = spooldir
#agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1
#agent1.sources.source1.fileHeader = true

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /tmp/log.log
agent1.sources.source1.channels = channel1

# Describe/configure nc source1
#agent1.sources.source1.type = netcat
#agent1.sources.source1.bind = localhost
#agent1.sources.source1.port = 44444

#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# Describe sink1
#agent1.sinks.sink1.type = logger

agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%y-%m-%d/%H%M%S
agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%y-%m-%d/%H
agent1.sinks.sink1.hdfs.filePrefix = %{hostname}/events-
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 500
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 0
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 600
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
2.flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用

在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=XXX
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

这里配置的是60秒,文件滚动一次,也就每隔60秒,会新产生一个文件【前提,flume的source端有数据来】但是当我启动flume的时候,运行十几秒,不断写入数据,发现hdfs端频繁的产生文件,每隔几秒就有新文件产生
而且在flume的日志输出可以频繁看到这句:

[WARN] Block Under-replication detected. Rotating file.

只要有这句,就会产生一个新的文件

意思就是检测到复制块正在滚动文件,结合源码看下:

private boolean shouldRotate() {
    boolean doRotate = false;

    if (writer.isUnderReplicated()) {
      this.isUnderReplicated = true;
      doRotate = true;
    } else {
      this.isUnderReplicated = false;
    }

    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }

    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }

    return doRotate;
  }

这是判断是否滚动文件,但是这里面的第一判断条件是判断是否当前的HDFSWriter正在复制块

public boolean isUnderReplicated() {
    try {
      int numBlocks = getNumCurrentReplicas();
      if (numBlocks == -1) {
        return false;
      }
      int desiredBlocks;
      if (configuredMinReplicas != null) {
        desiredBlocks = configuredMinReplicas;
      } else {
        desiredBlocks = getFsDesiredReplication();
      }
      return numBlocks < desiredBlocks;
    } catch (IllegalAccessException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (InvocationTargetException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (IllegalArgumentException e) {
      logger.error("Unexpected error while checking replication factor", e);
    }
    return false;
  }

通过读取的配置复制块数量和当前正在复制的块比较,判断是否正在被复制

if (shouldRotate()) {
      boolean doRotate = true;

      if (isUnderReplicated) {
        if (maxConsecUnderReplRotations > 0 &&
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
          doRotate = false;
          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
            LOG.error("Hit max consecutive under-replication rotations ({}); " +
                "will not continue rolling files under this path due to " +
                "under-replication", maxConsecUnderReplRotations);
          }
        } else {
          LOG.warn("Block Under-replication detected. Rotating file.");
        }
        consecutiveUnderReplRotateCount++;
      } else {
        consecutiveUnderReplRotateCount = 0;
      }

以上方法,入口是shouldRotate()方法,也就是如果你配置了rollcount,rollsize大于0,会按照你的配置来滚动的,但是在入口进来后,发现,又去判断了是否有块在复制;

里面就读取了一个固定变量maxConsecUnderReplRotations=30,也就是正在复制的块,最多之能滚动出30个文件,如果超过了30次,该数据块如果还在复制中,那么数据也不会滚动了,doRotate=false,不会滚动了,所以有的人发现自己一旦运行一段时间,会出现30个文件再结合上面的源码看一下:

如果你配置了10秒滚动一次,写了2秒,恰好这时候该文件内容所在的块在复制中,那么虽然没到10秒,依然会给你滚动文件的,文件大小,事件数量的配置同理了。

为了解决上述问题,我们只要让程序感知不到写的文件所在块正在复制就行了,怎么做呢??

只要让isUnderReplicated()方法始终返回false就行了

该方法是通过当前正在被复制的块和配置中读取的复制块数量比较的,我们能改的就只有配置项中复制块的数量,而官方给出的flume配置项中有该项

hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.

默认读的是hadoop中的dfs.replication属性,该属性默认值是3这里我们也不去该hadoop中的配置,在flume中添加上述属性为1即可配置如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
#a1.sinks.k1.hdfs.fileType=DataStream
#a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

这样程序就永远不会因为文件所在块的复制而滚动文件了,只会根据你的配置项来滚动文件了,试试吧!!

配置redis为自动启动的服务

本文是讲述, 如何在centos6.x 系统下如何配置 redis为一个自动启动的服务的过程

在操作过程中 始终需要 root权限

一. Redis安装

redis-service-install-002

 

特别注意 redis的安装目录 如上图

  1. 下载redis 安装程序

wget http://download.redis.io/releases/redis-2.8.19.tar.gz

  1. 解压缩redis安装源代码

tar xzvf redis-2.8.19.tar.gz

cd redis-2.8.19/

  1. 安装redis

make PREFIX=/usr/local/redis install

一定要安装到特定目录中

  1. 拷贝redis配置文件到安装目录

cp   redis.conf    /usr/local/redis/

vi /usr/local/redis/redis.conf

 

二.安装Redis启动服务的辅助程序

  1. 下载

wget http://developer.axis.com/download/distribution/apps-sys-utils-start-stop-daemon-IR1_9_18-2.tar.gz

  1. 解压缩

tar zxf apps-sys-utils-start-stop-daemon-IR1_9_18-2.tar.gz

  1. 编译安装

cd apps/sys-utils/start-stop-daemon-IR1_9_18-2/

gcc start-stop-daemon.c -o start-stop-daemon

cp start-stop-daemon /usr/local/bin/start-stop-daemon

也可以从这里下载, 然后上传, 最后用上面步骤安装

apps-sys-utils-start-stop-daemon-IR1_9_18-2.tar

三.Redis启动服务的安装

1. 下载 安装代码

redis-as-service.v0.1.tar

从上面地址下载源代码

2. 上传到服务器

3. 解压缩源代码

4. 运行安装命令安装

sh install.sh

如下图

redis-service-install-001

5. 配置启动自动运行

chkconfig --level 2 redis-server on && chkconfig --level 3 redis-server on && chkconfig --level 4 redis-server on && chkconfig --level 5  redis-server on

最后检查一下是否设置成功

chkconfig  --list

显示如下:

redis-server              0:off  1:off  2:on   3:on   4:on   5:on   6:off

 

安装完成

 

树莓派上screen安装

使用ssh和xrdp进行远程登录,当连接断开后当前运行的工作也会中断。很多时候需要后台长时间执行,怎么办呢? 使用screen这个小工具,问题就解决了。screen是一个虚拟终端管理器。我们可以用它在后台管理终端界面,这样SSH断开后就不用怕正在进行的操 作中断了。

 

    Screen是一个虚拟终端管理器。我们可以用它在后台管理终端界面,这样SSH断开后就不用怕正在进行的操作中断了。

关于screen 参考:
linux screen—-让您处理终端游刃有余
linux下screen命令的使用

一、安装

一定先执行下面的命令, 否则可能安装不成功
pai-apt-get-screen001
sudo apt-get update
执行完成更新后, 就可以安装了!
sudo apt-get install screen

 

 

二、使用

 

1、创建一个虚拟终端

使用putty登录树莓派后执行:

screen -S terminal1

样就创建好一个名为terminal1的终端了。此时我们可以随便执行操作了,比如执行sudo apt-get upgrade,或者其它消耗时间比较长的工作,像编译内核等等。

按ctrl+a后再按d这样就保存好一个虚拟终端了,系统会提示deatached。

SSH什么的可以完全断开不管了,让虚拟终端自己运行去吧。

 

2、访问已经创建好的终端

screen -ls    #可以列出已经创建的正在后台运行的终端
screen -r     #终端名称就可以了
#比如screen -r terminal1

3、彻底退出

如果一个虚拟终端中的程序执行完毕了,screen -r 进入这个终端后再执行exit就完全退出了。
这样以后通过SSH编译内核之类的长时间工作时,再也不怕因为断网造成的操作中断了。
在任何linux设备上都能安装Screen,操作也是一样的。

maven仓库

来源:http://www.oschina.net/question/698806_159140

1 . 仓库简介

没有 Maven 时,项目用到的 .jar 文件通常需要拷贝到 /lib 目录,项目多了,拷贝的文件副本就多了,占用磁盘空间,且难于管理。Maven 使用一个称之为仓库的目录,根据构件的坐标统一存储这些构件的唯一副本,在项目中通过依赖声明,可以方便的引用构件。

2 . 仓库的布局

构件都有唯一的坐标,Maven 根据坐标管理构件的存储。如以下对 spring-orm-3.2.0 的存储:

文件路径对应了:groupId/artifactId/version/artifactId-version.packaging

3 . 仓库的分类

Maven 仓库分为本地仓库和远程仓库,寻找构件时,首先从本地仓库找,找不到则到远程仓库找,再找不到就报错;在远程仓库中找到了,就下载到本地仓库再使用。中央 仓库是 Maven 核心自带的远程仓库,默认地址:http://repo1.maven.org/maven2。除了中央仓库,还有其它很多公共的远程仓库。私服是架设在 本机或局域网中的一种特殊的远程仓库,通过私服可以方便的管理其它所有的外部远程仓库。

3 . 1 . 本地仓库

Maven 本地仓库默认地址为:${user.home}/.m2/repository。

通过修改 %MAVEN_HOME%/conf/settings.xml (或者:${user.home}/.m2/settings.xml,针对当前用户(推荐))配置文件可以更改本地仓库的位置。

3 . 2 . 中央仓库

安装完 Maven ,本地仓库几乎是空的,这时需要从远程仓库下载所需构件。Maven 配置了一个默认的远程仓库,即中央仓库,找到 %MAVEN_HOME%/lib/maven-model-builder-3.2.1.jar,打开 org/apache/maven/model /pom-4.0.0.xml 超级POM:

3 . 3 . 在项目中添加其他远程仓库

当中央仓库找不到所需的构件时,我们可以配置 pom.xml ,添加其它的远程仓库。

复制代码
 1 <repositories>  2 <repository>  3 <id>Sonatype</id>  4 <name>Sonatype Repository</name>  5 <url>http://repository.sonatype.org/content/groups/public/</url>  6 <layout>default</layout>  7 <releases>  8 <enabled>true</enabled>  9 </releases> 10 <snapshots> 11 <enabled>false</enabled> 12 </snapshots> 13 </repository> 14 </repositories>
复制代码

其中 id 必须唯一,若不唯一,如设置为 central 将覆盖中央仓库的配置。

3 . 4 . 镜像仓库

镜像仓库可以理解为仓库的副本,从仓库中可以找到的构件,从镜像仓库中也可以找到。比如针对中央仓库 http://repo1.maven.org/maven2 ,在中国有它的镜像仓库,这样我们直接访问镜像仓库,更快更稳定。

复制代码
 1 <settings>  2  ...  3 <mirrors>  4 <mirror>  5 <id>maven.net.cn</id>  6 <name>central mirror in china</name>  7 <url>http://maven.net.cn/content/groups/public</url>  8 <mirrorOf>central</mirrorOf> <!--表明为central中央仓库配置镜像仓库-->  9 </mirror> 10 </mirrors> 11  ... 12 </settings>
复制代码

其中,<mirrorOf> 指明了为哪个仓库配置镜像,可以使用通配符如:<mirrorOf>*</mirrorOf>,或者 <mirrorOf>repo1,repo2</mirrorOf> 等进行匹配。一旦配置了镜像,所有针对原仓库的访问将转到镜像仓库的访问,原仓库将不再能直接访问,即使镜像仓库不稳定或停用。在搭建私服的时候,我们通 常为所有仓库设置镜像为私服地址,通过私服对所有仓库进行统一管理。

3 . 5 . 常用的仓库搜索地址

lyg945 lyg945
发帖于 2年前
0回/476阅