月度归档:2015年09月

HornetQ与Apache Artemis

最近研究amq发现了Apache Artemis, 感觉很好, 仔细看了些, 最后发现他来源于HornetQ

信息如下:
HornetQ Apache donation and Apache Artemis 1.0.0 release

The HornetQ code base was donated to the Apache ActiveMQ community late last year and now resides as a sub project under the ActiveMQ umbrella named 'Artemis'. Since the code donation, the developers have been working tirelessly to get an initial release of Artemis out the door; to allow folks to give it a whirl and to finalise the donation process. With the release of Apache Artemis 1.0.0, that process has come to a close and the code donation has now been completed.

The active developer community has migrated across to Artemis; all of the developers that were active on HornetQ are now committers to the Artemis project; working on the code base as part of the ActiveMQ umbrella. The hope is that the union of the two great communities HornetQ and ActiveMQ will provide a path for a next generation of message broker with more advanced features, better performance and greater stability. We feel we can achieve these goals using the Artemis core with it's superior performance in combination with the vast feature offering of ActiveMQ. As Aristotle once put it "The whole is greater than the sum of it's parts". Let's hope this holds true for this union and great things will happen.

The Artemis project is targeted to house this next generation of message broker, as such any new feature requests or contributions from the HornetQ community should now be placed into the Artemis stream of development. HornetQ will of course have bugs fixed on its active branches (2.3 and 2.4) but will be mostly in maintenance only mode. For those HornetQ users who are wishing to migrate to Artemis 1.0.0, your job should be easy, Artemis is already compatible with HornetQ clients and supports a number of other protocols such as AMQP, Stomp, ActiveMQ's native messaging protocol 'OpenWire' (at Alpha with support for ActiveMQ JMS clients and basic transport) and also JMS 2. In addition we have already started development on support for MQTT.

To find out more on the Artemis project and how to subscribe to the Artemis mailing lists and get involved, please visit the Artemis website here: http://activemq.apache.org/artemis.

We look forward to seeing you all very soon

HornetQ 参考手册:

http://wenku.baidu.com/view/2f19b1557fd5360cba1adbd9.html?from=search

HornetQ 是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。

异步消息系统
功    能    完全支持JMS,HornetQ
特    点    定义属于自己的消息API

HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的依赖第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。

这款JBoss的MQ相当出色。发现国内关于HornetQ的资料很少,国外有一部分,但是版本都很久了。

自己写了一个例子,环境如下:

1、HornetQ的版本是 hornetq-2.2.5.Final

2、JDK1.6.0_30-b12

3、HornetQ自带example里面的config/stand-alone/non-clustered的配置文件

4、HornetQ作为独立的服务器,运行在一台Dell1950(2CPU,16G内存)上

5、自己写了两个小例子,很简单,就是一个Producer和Consumer

6、发送Person类的实例(必须实现Serializable接口)

7、注意:Producer和Consumer用到的Person类,必须在各个项目的相同的package下面,具有相同的serialVersionUID

8、在classpath下面有jndi.properties文件,里面放置着寻找服务器上面JNDI Server必须的配置

9、在classpath下面有个log4j的配置文件,用来答应日志

代码如下:

public class Producer {
private static Logger logger = Logger.getLogger(Producer.class);

/**
* @param args
*/
public static void main(String[] args) {
try {
runExample();
} catch (Exception e) {
e.printStackTrace();
}

}

private static void runExample() throws NamingException, JMSException {
InitialContext ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory) ic
.lookup("/ConnectionFactory");
Queue orderQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
Connection connection = cf.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(orderQueue);
connection.start();
int count = 0;
try {
while (true) {
Person one = new Person(count++, "xuepeng_" + count);
ObjectMessage msg = session.createObjectMessage(one);
producer.send(msg);
logger.info(Producer.class.getName()
+ " start to sent message: " + one);
}
} finally {
session.close();
}
}

}

Java代码  收藏代码

public class Person implements Serializable {

private static final long serialVersionUID = 2670718766927459001L;
private Integer id;
private String name;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private String time = format.format(new Date());

/**
* @param id
* @param name
*/
public Person(Integer id, String name) {
super();
this.id = id;
this.name = name;
}

/**
* @return the id
*/
public Integer getId() {
return id;
}

/**
* @param id
*            the id to set
*/
public void setId(Integer id) {
this.id = id;
}

/**
* @return the name
*/
public String getName() {
return name;
}

/**
* @param name
*            the name to set
*/
public void setName(String name) {
this.name = name;
}

/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + ", time=" + time + "]";
}

/*
* (non-Javadoc)
*
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 37;
int result = 17;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}

/*
* (non-Javadoc)
*
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Person other = (Person) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}

}

Java代码  收藏代码

public class Consumer {
private static Logger logger = Logger.getLogger(Consumer.class);

/**
* @param args
*/
public static void main(String[] args) {
try {
runExample();
} catch (Exception e) {
e.printStackTrace();
}

}

private static void runExample() throws NamingException, JMSException {
InitialContext ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory) ic
.lookup("/ConnectionFactory");
Queue orderQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
Connection connection = cf.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(orderQueue);
connection.start();
try {
while (true) {
ObjectMessage messageReceived = (ObjectMessage) consumer.receive();
org.hornetq.jms.example.Person one = (org.hornetq.jms.example.Person)messageReceived.getObject();
logger.info(Consumer.class.getName()
+ " start to receive message: " + one);
}
} finally {
session.close();
}
}

}

启动Linux上面的HorneQ服务之后,运行上面的Producer和Consumer均可以实现通讯。

AsyncTask使用解析和同步执行的问题

Android UI是线程不安全的,如果想要在子线程里进行UI操作,就需要借助Android的异步消息处理机制。不过为了更加方便我们在子线程中更新UI元素,Android从1.5版本就引入了一个AsyncTask类,使用它就可以非常灵活方便地从子线程切换到UI线程。 AsyncTask 很早就出现在Android的API里了,所以我相信大多数朋友对它的用法都已经非常熟悉。不过今天我还是准备从AsyncTask的基本用法开始讲起, 然后我们再来一起分析下AsyncTask的并行执行情况

AsyncTask的基本用法

首先来看一下AsyncTask的基本用法,由于AsyncTask是一个抽象类,所以如果我们想使用它,就必须要创建一个子类去继承它。使用时需要继承这个类,然后调用execute()方法。注意继承时需要设定三个泛型Params,Progress和Result的类型,如AsyncTask<Void,Inetger,Void>:

  • Params是指调用execute()方法时传入的参数类型和doInBackgound()的参数类型
  • Progress是指更新进度时传递的参数类型,即publishProgress()和onProgressUpdate()的参数类型
  • Result是指doInBackground()的返回值类型
上面的说明涉及到几个方法:
  • doInBackgound() 这个方法是继承AsyncTask必须要实现的,运行于后台,耗时的操作可以在这里做
  • publishProgress() 更新进度,给onProgressUpdate()传递进度参数
  • onProgressUpdate() 在publishProgress()调用完被调用,更新进度

布局文件如下:

<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:paddingLeft="@dimen/activity_horizontal_margin"
android:paddingRight="@dimen/activity_horizontal_margin"
android:paddingTop="@dimen/activity_vertical_margin"
android:paddingBottom="@dimen/activity_vertical_margin"
tools:context=".MainActivity">

<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="显示异步的执行情况" android:id="@+id/tv_show"/>
<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="启动一个异步任务"
android:id="@+id/bt_start_AsyncTask" android:layout_below="@+id/tv_show" android:layout_alignLeft="@+id/tv_show"
android:layout_marginTop="12dp" android:focusableInTouchMode="true"/>

<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="启动一个异步 长耗时 任务"
android:id="@+id/bt_start_long_AsyncTask"
android:focusableInTouchMode="true"
android:layout_above="@+id/bt_toast" android:layout_alignLeft="@+id/tv_show2"
android:layout_marginBottom="13dp"/>
<Button android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="弹出对话框"
android:id="@+id/bt_toast"
android:focusableInTouchMode="true"
android:layout_centerVertical="true" android:layout_alignLeft="@+id/bt_start_long_AsyncTask"/>
<TextView android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="显示长耗时情况"
android:id="@+id/tv_show2"
android:layout_above="@+id/bt_start_long_AsyncTask" android:layout_alignLeft="@+id/bt_start_AsyncTask"
android:layout_marginBottom="11dp"/>

</RelativeLayout>

源代码如下:

 package cn.iigrowing.android.MyAsyncTask;

import android.content.Context;
import android.os.AsyncTask;
import android.os.Bundle;
import android.app.Activity;
import android.view.Menu;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import android.widget.Toast;

public class MainActivity extends Activity {

private Button bt_start_AsyncTask = null;
private Button bt_start_long_AsyncTask = null;
private Button bt_toast = null;

private TextView tv = null;
private TextView tv2 = null;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

bt_start_AsyncTask = (Button) findViewById(R.id.bt_start_AsyncTask);
bt_start_long_AsyncTask = (Button) findViewById(R.id.bt_start_long_AsyncTask);
bt_toast = (Button) findViewById(R.id.bt_toast);

tv = (TextView) findViewById(R.id.tv_show);
tv2 = (TextView) findViewById(R.id.tv_show2);

bt_start_AsyncTask.setOnClickListener(new Button.OnClickListener() {
public void onClick(View v) {
update();
}
});
bt_start_long_AsyncTask.setOnClickListener(new Button.OnClickListener() {
public void onClick(View v) {
update2();
}
});
bt_toast.setOnClickListener(new Button.OnClickListener() {
public void onClick(View v) {
Toast.makeText(MainActivity.this, "点击了按钮", Toast.LENGTH_LONG).show();
}
});

}

private void update() {
UpdateTextTask updateTextTask = new UpdateTextTask(this, 1, tv);
updateTextTask.execute();
}

private void update2() {
UpdateTextTask updateTextTask = new UpdateTextTask(this, 10, tv2);
updateTextTask.execute();
}

@Override
public boolean onCreateOptionsMenu(Menu menu) {
// Inflate the menu; this adds items to the action bar if it is present.
getMenuInflater().inflate(R.menu.main, menu);
return true;
}

class UpdateTextTask extends AsyncTask<Void, Integer, Integer> {
private Context context;
private TextView mytv = null;

private int sleepTime = 1;

UpdateTextTask(Context context, int sleepSecond, TextView tv) {
this.context = context;
sleepTime = sleepSecond;
this.mytv = tv;
}

/**
* 运行在UI线程中,在调用doInBackground()之前执行
*/
@Override
protected void onPreExecute() {
Toast.makeText(context, "开始执行", Toast.LENGTH_SHORT).show();
}

/**
* 后台运行的方法,可以运行非UI线程,可以执行耗时的方法
*/
@Override
protected Integer doInBackground(Void... params) {
int i = 0;
while (i < 10) {
i++;
publishProgress(i);
try {
Thread.sleep(1000 * sleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
}

/**
* 运行在ui线程中,在doInBackground()执行完毕后执行
*/
@Override
protected void onPostExecute(Integer integer) {
Toast.makeText(context, "执行完毕", Toast.LENGTH_SHORT).show();
}

/**
* 在publishProgress()被调用以后执行,publishProgress()用于更新进度
*/
@Override
protected void onProgressUpdate(Integer... values) {
this.mytv.setText(this.mytv.getText() + " " + values[0]);
}
}
}

通过上面的例子发现, 在同时点击两个测试按钮时, 两个文本框不能同时更新, 证明两个异步任务没有并行执行!!

查找网络资料有如下:研 究过Android系统源码的同学会发现:AsyncTask在android2.3的时候线程池是一个核心数为5线程,队列可容纳10线程,最大执行 128个任务,这存在一个问题,当你真的有138个并发时,即使手机没被你撑爆,那么超出这个指标应用绝对crash掉。 后来升级到3.0,为了避免并发带来的一些列问题,AsyncTask竟然成为序列执行器了,也就是你即使你同时execute N个AsyncTask,它也是挨个排队执行的。 这一点请同学们一定注意,AsyncTask在3.0以后,是异步的没错,但不是并发的。关于这一点的改进办法,我之前写过一篇《Thread并发请求封装——深入理解AsyncTask类》没有看过的同学可以看这里,本文是在这个基础上对AsyncTask做进一步的优化。参考:http://my.oschina.net/kymjs/blog/350565

程序源代码

链接: http://pan.baidu.com/s/1Eqw3O 密码: 6kll

Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的 Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。

架构设计要点

Flume的架构主要有一下几个核心概念:

  • Event:一个数据单元,带有一个可选的消息头
  • Flow:Event从源点到达目的点的迁移的抽象
  • Client:操作位于源点处的Event,将其发送到Flume Agent
  • Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
  • Source:用来消费传递到该组件的Event
  • Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
  • Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Flume NG架构,如图所示:
flume-ng-architecture
外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。
一个最基本Flow的配置,格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:
表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。
上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到 哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是 Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。
下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:

  • 多个Agent顺序连接

flume-multiseq-agents
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

  • 多个Agent的数据汇聚到同一个Agent

flume-join-agent
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每 个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

  • 多路(Multiplexing)Agent

flume-multiplexing-agent
这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将 最前端的数据源复制多份,分别传递到多

个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将 数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。
Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的 mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果 header的值为Value2、Value3,数据从Source1路由到Channel2。

  • 实现load balance功能

flume-load-balance-agents
Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别

连 接到一个独立的Agent上,示例配置,如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

实现failover能

Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理 Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详 细配置进行说明,通过列表的方式分别对三个组件进行概要说明:

  • Flume Source
Source类型说明
Avro Source支持Avro协议(实际上是Avro RPC),内置支持
Thrift Source支持Thrift协议,内置支持
Exec Source基于Unix的command在标准输出上生产数据
JMS Source从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
Spooling Directory Source监控指定目录内数据变更
Twitter 1% firehose Source通过API持续下载Twitter数据,试验性质
Netcat Source监控某个端口,将流经端口的每一个文本行数据作为Event输入
Sequence Generator Source序列生成器数据源,生产序列数据
Syslog Sources读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources兼容老的Flume OG中Source(0.9.x版本)
  • Flume Channel
Channel类型说明
Memory ChannelEvent数据存储在内存中
JDBC ChannelEvent数据存储在持久化存储中,当前Flume Channel内置支持Derby
File ChannelEvent数据存储在磁盘文件中
Spillable Memory ChannelEvent数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel测试用途
Custom Channel自定义Channel实现
  • Flume Sink
Sink类型说明
HDFS Sink数据写入HDFS
Logger Sink数据写入日志文件
Avro Sink数据被转换成Avro Event,然后发送到配置的RPC端口上
Thrift Sink数据被转换成Thrift Event,然后发送到配置的RPC端口上
IRC Sink数据在IRC上进行回放
File Roll Sink存储数据到本地文件系统
Null Sink丢弃到所有数据
HBase Sink数据写入HBase数据库
Morphline Solr Sink数据发送到Solr搜索服务器(集群)
ElasticSearch Sink数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink写数据到Kite Dataset,试验性质的
Custom Sink自定义Sink实现

另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。

应用实践

安装Flume NG非常简单,我们使用最新的1.5.0.1版本,执行如下命令:
cd /usr/local
wget http://mirror.bit.edu.cn/apache/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz
tar xvzf apache-flume-1.5.0.1-bin.tar.gz
cd apache-flume-1.5.0.1-bin

如果需要使用到Hadoop集群,保证Hadoop相关的环境变量都已经正确配置,并且Hadoop集群可用。下面,通过一些实际的配置实例,来了解Flume的使用。为了简单期间,channel我们使用Memory类型的channel。

Avro Source+Memory Channel+Logger Sink

使用apache-flume-1.5.0.1自带的例子,使用Avro Source接收外部数据源,Logger作为sink,即通过Avro RPC调用,将数据缓存在channel中,然后通过Logger打印出调用发送的数据。
配置Agent,修改配置文件conf/flume-conf.properties,内容如下:
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.channels.ch1.capacity = 1000
agent1.sources = avro-source1
agent1.sinks = log-sink1

首先,启动Agent进程:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1

然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

Avro Source+Memory Channel+HDFS Sink

配置Agent,修改配置文件conf/flume-conf-hdfs.properties,内容如下:
# Define a source, channel, sink
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT

首先,启动Agent:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1

然后,启动Avro Client,发送数据:
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

可以查看同步到HDFS上的数据:
hdfs dfs -ls /data/flume

结果示例,如下所示:
-rw-r--r--   3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
-rw-r--r--   3 shirdrn supergroup    1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
-rw-r--r--   3 shirdrn supergroup     259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log

Spooling Directory Source+Memory Channel+HDFS Sink

配置Agent,修改配置文件flume-conf-spool.properties,内容如下:
# Define source, channel, sink
agent1.sources = spool-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Spool directory source
agent1.sources.spool-source1.channels = ch1
agent1.sources.spool-source1.type = spooldir
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
agent1.sources.spool-source1.batchSize = 50
agent1.sources.spool-source1.inputCharset = UTF-8

# Define and configure a hdfs sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
agent1.sinks.hdfs-sink1.rollInterval = 0

启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1

可以查看HDFS上同步过来的数据:
hdfs dfs -ls /data/flume

结果示例,如下所示:
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
-rw-r--r--   3 shirdrn supergroup       1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
-rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log

Exec Source+Memory Channel+File Roll Sink

配置Agent,修改配置文件flume-conf-file.properties,内容如下:
# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1

# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000

# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50

# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data

启动Agent进程,执行如下命令:
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1

可以查看File Roll Sink对应的本地文件系统目录/home/shirdrn/sink_data下,示例如下所示:
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn        0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5

有关Flume NG更多配置及其说明,请参考官方用户手册,非常详细。
参考链接

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这 里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。
我们的应用场景是分析用户使用手机App的行为,描述如下所示:

  • 手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
  • 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持
  • 经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析

Spark Streaming介绍

Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就 构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所 示(来自官网):
spark-cluster-overview-streaming-dstream
我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过 转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且 Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才 会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久 化,Action操作包括reduceByKey、count、foreach、collect等。关于Transformations和Actions 更详细内容,可以查看官网文档。
同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作 用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间 间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行 Spark的Action操作,使得该操作之前lazy的Transformation操作序列真正地执行。

Kafka+Spark Streaming+Redis编程实践

下面,我们根据上面提到的应用场景,来编程实现这个实时计算应用。首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下:
{"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}

一个事件包含4个字段:

  • uid:用户编号
  • event_time:事件发生时间戳
  • os_type:手机App操作系统类型
  • click_count:点击次数

下面是我们实现的代码,如下所示:
package org.shirdrn.spark.streaming.utils

import java.util.Properties
import scala.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random

object KafkaEventProducer {

private val users = Array(
"4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
"011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
"068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
"d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
"6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")

private val random = new Random()

private var pointer = -1

def getUserID() : String = {
pointer = pointer + 1
if(pointer >= users.length) {
pointer = 0
users(pointer)
} else {
users(pointer)
}
}

def click() : Double = {
random.nextInt(10)
}

// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events
// bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning
def main(args: Array[String]): Unit = {
val topic = "user_events"
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)

while(true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os_type", "Android")
.put("click_count", click)

// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)

Thread.sleep(200)
}
}
}

通过控制上面程序最后一行的时间间隔来控制模拟写入速度。下面我们来讨论实现实时统计每个用户的点击次数,它是按照用户分组进行累加次数,逻辑比较简单,关键是在实现过程中要注意一些问题,如对象序列化等。先看实现代码,稍后我们再详细讨论,代码实现如下所示:
object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}

// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1
val clickHashKey = "app::users::click"

// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})

ssc.start()
ssc.awaitTermination()

}
}

上面代码使用了Jedis客户端来操作Redis,将分组计数结果数据累加写入Redis存储,如果其他系统需要实时获取该数据,直接从Redis实时读取即可。RedisClient实现代码如下所示:
object RedisClient extends Serializable {
val redisHost = "10.10.4.130"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)

lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}

上面代码我们分别在local[K]和Spark Standalone集群模式下运行通过。
如果我们是在开发环境进行调试的时候,也就是使用local[K]部署模式,在本地启动K个Worker线程来计算,这K个Worker在同一个JVM实 例里,上面的代码默认情况是,如果没有传参数则是local[K]模式,所以如果使用这种方式在创建Redis连接池或连接的时候,可能非常容易调试通 过,但是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的时候,就会报错,主要是由于在处理Redis连接池或连接的时候出错了。我们可以看一下Spark架构,如图 所示(来自官网):
spark-cluster-overview-0001无论是在本地模式、Standalone模式,还是在Mesos或YARN模式下,整个Spark集群的结构都可以用上图抽象表示,只是各个组件的运行环 境不同,导致组件可能是分布式的,或本地的,或单个JVM实例的。如在本地模式,则上图表现为在同一节点上的单个进程之内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群之外的一个节点上提交Spark Application,其他的组件都运行在YARN集群管理的节点上。
在Spark集群环境部署Application后,在进行计算的时候会将作用于RDD数据集上的函数(Functions)发送到集群中Worker上 的Executor上(在Spark Streaming中是作用于DStream的操作),那么这些函数操作所作用的对象(Elements)必须是可序列化的,通过Scala也可以使用 lazy引用来解决,否则这些对象(Elements)在跨节点序列化传输后,无法正确地执行反序列化重构成实际可用的对象。上面代码我们使用lazy引 用(Lazy Reference)来实现的,代码如下所示:
// lazy pool reference
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
...
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})

另一种方式,我们将代码修改为,把对Redis连接的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理,如下所示:
package org.shirdrn.spark.streaming

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder
import net.sf.json.JSONObject
import redis.clients.jedis.JedisPool

object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}

// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1
val clickHashKey = "app::users::click"

// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {

/**
* Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
*/
object InternalRedisClient extends Serializable {

@transient private var pool: JedisPool = null

def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
}

def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if(pool == null) {
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
poolConfig.setTestOnBorrow(testOnBorrow)
poolConfig.setTestOnReturn(testOnReturn)
poolConfig.setMaxWaitMillis(maxWaitMillis)
pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)

val hook = new Thread{
override def run = pool.destroy()
}
sys.addShutdownHook(hook.run)
}
}

def getPool: JedisPool = {
assert(pool != null)
pool
}
}

// Redis configurations
val maxTotal = 10
val maxIdle = 10
val minIdle = 1
val redisHost = "10.10.4.130"
val redisPort = 6379
val redisTimeout = 30000
val dbIndex = 1
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)

val uid = pair._1
val clickCount = pair._2
val jedis =InternalRedisClient.getPool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
InternalRedisClient.getPool.returnResource(jedis)
})
})
})

ssc.start()
ssc.awaitTermination()

}
}

上面代码实现,得益于Scala语言的特性,可以在代码中任何位置进行class或object的定义,我们将用来管理Redis连接的代码放在了 特定操作的内部,就避免了瞬态(Transient)对象跨节点序列化的问题。这样做还要求我们能够了解Spark内部是如何操作RDD数据集的,更多可 以参考RDD或Spark相关文档。
在集群上,以Standalone模式运行,执行如下命令:
1    cd /usr/local/spark
2    ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077

可以查看集群中各个Worker节点执行计算任务的状态,也可以非常方便地通过Web页面查看。
下面,看一下我们存储到Redis中的计算结果,如下所示:
127.0.0.1:6379[1]> HGETALL app::users::click
1) "4A4D769EB9679C054DE81B973ED5D768"
2) "7037"
3) "8dfeb5aaafc027d89349ac9a20b3930f"
4) "6992"
5) "011BBF43B89BFBF266C865DF0397AA71"
6) "7021"
7) "97edfc08311c70143401745a03a50706"
8) "6874"
9) "d7f141563005d1b5d0d3dd30138f3f62"
10) "7057"
11) "a95f22eabc4fd4b580c011a3161a9d9d"
12) "7092"
13) "6b67c8c700427dee7552f81f3228c927"
14) "7266"
15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"
16) "7188"
17) "c8ee90aade1671a21336c721512b817a"
18) "6950"
19) "068b746ed4620d25e26055a9f804385f"

有关更多关于Spark Streaming的详细内容,可以参考官方文档。

附录

这里,附上前面开发的应用所对应的依赖,以及打包Spark Streaming应用程序的Maven配置,以供参考。如果使用maven-shade-plugin插件,配置有问题的话,打包后在Spark集群上 提交Application时候可能会报错Invalid signature file digest for Manifest main attributes。参考的Maven配置,如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.shirdrn.spark</groupId>
<artifactId>spark</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>${basedir}/src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

来源:http://shiyanjun.cn/archives/1097.html

虚拟路由器冗余协议VRRP详解

来源:http://www.it165.net/network/html/201303/1026.html

为什么要使用VRRP技术我们知道,为了实现不同子网之间的设备通信,需要配置路由。目前常用的指定路由方法有两种:
第一种是通过路由协议 :RIP、OSPF动态学习
第二种是通过静态路由: 对终端PC机配置静态路由。
这两种路由各有各的优缺点:
第一种路由可以自动寻找最优路径,邻居路由也可以通过学习来获得路由表,但是动态路由占用线路带宽和CPU处理时间。
第二种路由是不需要CPU处理时间同样也不占用线路带宽,但是这个路由需要对终端的PC机进行配置网关来实现,工作量是比较大的。
对于以上的两种路由在现在来说都是广泛应用的。
我们现在只来分析静态路由的缺点,因为VRRP技术就是使用在静态路由上,而不是动态路由上。
对于静态路由来说,对终端PC机配置默认网关。如果作为默认网关的路由器出现故障,所有使用该网关为下一跳的主机的通信是要中断的。如下图所示;

 

在上图中,主机A—D都配置了一个默认的网关:10.1.1.1,网关路由的下一跳指向主机所在网段内的一个路由器RouterA,RouterA将报文发送到外网,但是如果现在RouterA坏掉了,那么所有的主机将无法与其他网段进行通信了。
为了解决以上的问题,我们可以加一个路由器RouterB,如下图所示:

 当RouterA坏掉时,所有的PC机将网关切换到RouterB上的网关。这样就实现了路由器的备份。这个技术就是VRRP技术---虚拟路由器冗余协议
VRRP简介(转发机制)1. VRRP(Virtual Router Redundancy Protocol,虚拟路由器冗余协议)将可以承担网关功能的路由器加入到备份组中,形成一台虚拟路由器,由VRRP的选举机制决定哪台路由器承担转发任 务,局域网内的主机只需将虚拟路由器配置为缺省网关
2. VRRP是一种容错协议,在提高可靠性的同时,简化了主机的配置。在具有多播或广播能力的局域网(如以太网)中,借助VRRP 能在某台设备出现故障时仍然提供高可靠的缺省链路,有效避免单一链路发生故障后网络中断的问题,而无需修改动态路由协议、路由发现协议等配置信息
3. VRRP协议的实现有VRRPv2和VRRPv3两个版本,VRRPv2于IPv4,VRRPv3基     于IPv6
4. VRRP路由器: 所有运行VRRP协议的路由器就叫做VRRP路由器
5. VRRP备份组: 多台路由器被分到一个组中,在这个组中选举出一台主路由器,其他作为备份路由器。平常时候都是主路由器一个工作,备份路由器空闲,当主路由器故障后,从多台备份路由器中选举出一台替代故障的主路由器工作。这一组中的路由器构成了一个备份组。
如下图所示: 有两个路由器,两个网关,从两个路由器中选举出一个路由器作为主路由器,其他的都是备份路由器,主路由器负责发转发数据报,而备份路由器处于空闲状态,当主路由器出现故障后,备份路由器会成为主路由器,代替主路由器实现转发功能。

6.  虚拟路由器:虚拟路由器是VRRP备份组中所有路由器的集合,它是一个逻辑概念,并不是正真存在的。从备份组外面看备份组中的路由器,感觉组中的所有路由 器就像一个 一样,你可以理解为  在一个组中: 主路由器+所有备份路由器=虚拟路由器。虚拟路由器有一个虚拟的IP地址和MAC地址。如果虚拟IP和备份组中的某台路由器的IP相同的话,那么这台路由 器称为IP地址拥有者,并且作为备份组中的主路由器。
如下图所示:  RA、RB和RC都是VRRP路由器,他们构成了一个VRRP备份组,RA为主路由器,RB和RC为备份路由器,这三台路由器从外界来看就像一台一样,这 样构成一个虚拟路由器Router Group,虚拟路由器有一个虚拟的IP地址为10.1.1.1(RA主路由器的IP)。RA是IP地址拥有者,也是主路由器。
7. 虚拟IP地址和MAC地址:VRRP组(备份组)中的虚拟路由器对外表现为唯一的虚拟MAC地址,地址格式为00-00-5E-00-01-【VRID】,VRID为VRRP组的编号,范围是0~255.
上图中,三台路由器在一个组中,这个组可以起一个0~255之间的编号。

注意:
1.虚拟路由器具有IP地址。局域网内的主机仅需要知道这个虚拟路由器的IP地址,并将其设置为缺省路由的下一跳地址
2.虚拟路由器的 IP 地址可以是备份组所在网段中未被分配的IP 地址,也可以和备份组内的某个路由器的接口IP 地址相同
3.接口 IP 地址与虚拟IP 地址相同的路由器被称为“IP 地址拥有者” 在同一个 VRRP 备份组中,只允许配置一个IP 地址拥有者

VRRP状态

VRRP路由器在运行过程中有三种状态:
1. Initialize状态: 系统启动后就进入Initialize,此状态下路由器不对VRRP报文做任何处
处理,可以理解为初始化
2. Master状态: 路由器会发送VRRP通告,发送免费ARP报文。
3. Backup状态: 接受VRRP通告。
一般主路由器处于Master状态,备份路由器处于Backup状态。
VRRP选举机制VRRP使用选举机制来确定路由器的状态,运行VRRP的一组路由器对外构成了一个虚拟路由器,其中一台路由器处于Master状态,其他处于Backup状态。所以主路由器又叫做Master路由器,备份路由器又叫做Backup路由器。
优先级选举:
1.VRRP组中IP拥有者。如果虚拟IP地址与VRRP组中的某台VRRP路由器IP地址相同,则此路由器为IP地址拥有者,这台路由器将被定位主路由器。
2.比较优先级。如果没有IP地址拥有者,则比较路由器的优先级,优先级的范围是0~255,大的作为主路由器
3.比较IP地址。在没有Ip地址拥有者和优先级相同的情况下,IP地址大的作为主路由器。
如下图所示: 虚拟IP为10.1.1.254,在VRRP组中没有IP地址拥有者,则比较优先级,很明显RB和RA的优先级要大于RC,则比较RA和RB的IP地址,RB的IP地址大。所以RB为组中的主路由器。

VRRP定时器

① VRRP通告报文时间间隔定时器
1> VRRP备份组中的Master路由器会定时发送VRRP通告报文,通知备份组内的
路由器自己工作正常
2> 用户可以通过设置VRRP定时器来调整Master路由器发送VRRP 通告报文的
时间间隔
3> 如果Backup路由器在等待了3个间隔时间后,依然没有收到VRRP 通告报文,则认为自己是Master路由器,并对外发送VRRP通告报文,重新进行Master路由器的选举
② VRRP抢占延迟时间定时器
1> 为了避免备份组内的成员频繁进行主备状态转换,让Backup路由器有足够的
时间搜集必要的信息(如路由信息),Backup 路由器接收到优先级低于本地优 先级的通告报文后,不会立即抢占成为Master
2> 而是等待一定时间——抢占延迟时间后,才会对外发送VRRP通告报文取代原 来的Master路由器
VRRP报文格式

VRRP只使用VRRP通告报文。
VRRP通告报文使用Ip组播数据包进行封装,组播地址为223.0.0.18,IANA给其分配的协议号为112。
VRRP通告报文的TTL值必须是255,如果VRRP路由器接受到TTL值不为255的VRRP通告报文,必须丢弃。
VRRP组中的主路由器会定期发送通告报文,备份路由器接受,他们通过这种方式来交流选举

VRRP工作过程总结:

1. 路由器使能VRRP 功能后,会根据优先级确定自己在备份组中的角色。优先级高的路由器成为Master 路由器,优先级低的成为Backup 路由器。Master 路由器定期发送VRRP
通告报文,通知备份组内的其他设备自己工作正常;Backup 路由器则启动定时器等待通告报文的到来。
2. 在抢占方式下,当Backup 路由器收到VRRP 通告报文后,会将自己的优先级与通告报 文中的优先级进行比较。如果大于通告报文中的优先级,则成为Master 路由器;否则将保持Backup状态
3. 在非抢占方式下,只要Master 路由器没有出现故障,备份组中的路由器始终保持Master 或Backup 状态,Backup 路由器即使随后被配置了更高的优先级也不会成为Master 路由器
4. 如果Backup 路由器的定时器超时后仍未收到Master 路由器发送来的VRRP 通告报文,则认为Master 路由器已经无法正常工作,此时Backup 路由器会认为自己是Master 路由器,并对外发送VRRP 通告报文。备份组内的路由器根据优先级选举出Master 路由 器,承担报文的转发功能

VRRP基本配置

配置VRRP组  要启用VRRP,最基本的配置就是要创建VRRP组,并为VRRP组配置虚拟IP地址
Vrrp group-number ip ip-address [secondary]
group-number 为VRRP组的编号。即VRID 范围1~255,
ip-address    虚拟IP地址。
Secondary    辅助IP地址
需要在主路由器和备份路由器上配置

 

 配置VRRP优先级

如果希望指定某台路由器称为主路由器,可以手工调整其优先级
Vrrp group-number priority number
Group-number  VRRP组号 VIRD
Priority   表示优先级
Number  表示优先级 范围,0~255,默认为100,但是0被保留为特殊用途,255表示IP地址拥有者。
优先级的配置在没有IP地址拥有者的情况下。想让哪台路由器成为主路由器就在哪台路由器上配置

 配置VRRP接口跟踪

如上图所示:RA为主路由器,RB为备份路由器,但是当RA上的接口S0发生故障时,RA依然从接口E0发送通告报文,声明自己为主路由器,但是RA实际上已经不能进行转发了。也就是说路由器网路中不能判定路由器接口是否发生了故障。
VRRP接口跟踪机制就是检测接口故障的一种机制。配置了接口跟踪机制的路由器,当自己的接口发生故障时会将自己的路由器优先级降低,从而使自己从主路由器变为备份路由器,然后原来的备份路由器此时将成为主路由器。

Vrrp group-nunmber track interface [priority-decrement]

priority-decrement为降低的优先级数
注意: priority-decrement是降低了多少而不是降低到多少,比如priority-decrement为30,那么此路由器的优先级在原来基础上降低30.


配置VRRP抢占模式

抢占模式: 指当原来的路由器从故障中回复并接入到网络层后,配置了VRRP抢占模式的路由器将夺回原来属于自己的角色(主路由器),如果没有配置,回复之后将保持备份路由器的状态。
推荐使用启用抢占模式

vrrp group-number preempt  {delay [Delay-time] }

Delay  取值范围为1~255之间,如果不配置delay时间,那么其默认值为0秒。
delay-time 为延迟抢占的时间即从该路由器发现自己的优先级大于MASTER的优先级开始 经过delay-time这样长的一段时间之后才允许抢占。
在主路由器中配置该命令

配置VRRP定时器

VRRP定时器可以修改通告报文的发送时间

vrrp group-number timers advertise vrrp-advertise-interval

adver_interval为设置定时器adver_timer的时间间隔。MASTER每隔这样一个时间间隔,就会发送一个advertisement报文以通知组内其他路由器自己工作正常,
vrrp-advertise-interval的取值范围为0~254。
在主路由器上配置

配置VRRP定时学习功能

配置此命令的路由器会学习发送通告报文时间,进而计算出失效间隔,否则默认3s,
这条命令对于上面的配置VRRP定时器,在主路由器中配置了发送时间间隔,那么在备份路由器上就需要配置定时学习功能来计算失效间隔,因为失效间隔是发送时间的3倍
vrrp group-number times learn
VRRP负载均衡在一组VRRP组中,主路由器承担数据转发任务的同时,备份路由器的链路将处于空闲状态,这必然造成了带宽资源的浪费。为了避免这种浪费,使用VRRP负载均衡。

VRRP负载均衡

是通过实现将路由器加入到多个VRRP组实现的,使VRRP路由器在不同的组中担任不同的角色。
如下图所示:RA为组35的主路由器,同时又是组36的备份路由器
RB为组36的主路由器,同时又是组35的备份路由器。
在正常状态下,PC1、PC2走RA,PC3和PC4走RB,但是两个路由器一旦出现故障,就将网关切换到备份路由器。RA和RB可以说是相辅相成的。
VRRP并不具备对流量进行监控的机制,它的负载均衡只是通过使用多个VRRP组来实现的。