月度归档:2016年06月

HTTPS≠安全 互联网金融产品要当心

来源:http://www.infoq.com/cn/articles/wooyunsummit2015-linpeng?utm_source=infoq&utm_medium=related_content_link&utm_campaign=relatedContent_articles_clk

2014年伊始,由于国家政策转向利好,互联网金融尤其是P2P网络借贷产品呈现爆发式增长。许多一直关注互联网金融而又害怕政策风险的企业和金融巨头纷纷推出自己的P2P网贷产品。

据《北京商报》2014年报道,几乎多数P2P平台都遭受过黑客的攻击,只不过不少攻击并未曝光,其统计至2013年末公开数据显示,有70家P2P平台因遭遇黑客袭击而关门。而国内众多银行P2P业务站点更是安全漏洞频发。如何保护用户资金以及数据安全,保障对外服务的高可用性,成了横亘在P2P网贷乃至互联网金融从业者心头的难题。

在当当网,林鹏是国内最早一批公司内部组建安全团队的领头人;在网信金融(旗下有P2P网贷、众筹等多项金融业务),他带领团队见证了国内互联网金融风生水起与安全危机并存的2014年。

互联网金融的安全挑战

互联网金融对安全的要求比电商要高得多,这从攻击者的水准就能很容易看出来。“我们碰到的攻击者大多有着专业级别的水准,相比之下之前在电商可能更 偏向传统的小黑客。”林鹏曾发现公司有一部分机器疑似被用0day(该漏洞2周才公布)入侵,攻击者给机器编号,完全是要做持久性的控制。所用 RootKit都是多进程守护,杀死一个进程会触发另一个进程继续监控,这并不像传统小黑客挂一个Webshell就完事的做法。

对于从电商跳到互联网金融领域的林鹏而言,最大挑战来自用户帐号丢失问题。当当的帐号上基本上不存钱或者不会有很多钱,而网信金融上的投资者帐号则是大量的真金白银,对用户而言意义完全不同。其次是DDoS,电商可能很少遇到,但互联网金融几乎家家都会遇到。

另外还有一些电商不太会碰到的问题,比如黄牛党、红包套现、人工打码平台的对抗、以及猪八戒等任务交易平台等。

互联网金融安全的原始手段

互联网金融是一个比较新的领域。国内众所周知的首推支付宝的“余额宝”,然后各种“宝”、P2P、众筹才开始慢慢火起来。此前的人人贷、宜信,都没有进入大众视野。余额宝之后,互联网金融安全才慢慢重要起来。

谈到银行安全,我们用到最多的是U盾和网银插件。那互联网金融用什么来保障安全呢?标配的是HTTPS加密和支付密码,不过这还不够。

对P2P网贷产品而言,保障资金安全的绝招是同卡进出。即不管你拿这张卡投了多少钱,最后的收益都回到投的卡上。如果说要中途换卡,你需要经过一些 很原始的手段,比如人工审核照片,照一张一手拿身份证、一手拿银行卡,还需要露出来脸的照片上传给平台。这对于用户而言可能有些麻烦,但它却是当下最有效 的措施之一。

同卡进出只能保障资金不丢失,如果帐号被盗用,攻击者恶搞式地投一些长期项目,变相”冻结“帐号资金。再加上某些公司业务激进,做了其它资金出口,比如电商业务,那同卡进出也无能为力。

“HTTPS是一种形同虚设的安全措施”

如上所述,互联网金融产品的安全标配是HTTPS加密。但HTTPS真的安全吗?林鹏表示:“对我们这类做安全的人来说,HTTPS已经等同于不安全。如果说要劫持一个HTTPS,尤其是局域网有问题或者小区运营商捣鬼,完全没什么问题,它形同虚设。”而这还没谈及OpenSSL组件近两年频频曝出可窃听漏洞的严峻现状。

林鹏认为比较适宜的一种宣传方式,是告知用户个人信息、资金以及借贷方风控三方面的安全措施。比如网贷的用户都拥有不少资金,那么身份证、银行卡、 手机号等私人信息可以部分马赛克,即使帐号被盗也不会泄露;资金则是同卡进出、用户行为监控体系等;借贷方风控要看这家公司的具体征信体系,技术范畴很难 解决。

还有令人大跌眼镜的案例,某网贷平台为了表示自身交易透明,把员工工作现场在公网上实况直播,后来某白帽子以为是漏洞报告到乌云网。

如果3秒前用户数据库被拖

数据库被拖的案例数不胜数,一旦发现常规做法是敦促或强制所有用户更改密码。林鹏的建议更粗暴激进,如果在电商公司,会对所有留有余额的帐号进行冻结,需电话人工解封。这样虽麻烦,但至少可以保障用户资金安全。

网贷产品稍好,由于同卡进出功能的存在,资金至少不会损失,但要求所有用户更改密码是必要的。当然这个过程会配合用户行为监控体系,以避免改密码时 的二次攻击。其次是对期间资金操作的可撤销,免除用户遭受不必要的资金“冻结”。再其次还有前面提到的私人信息加密和打马赛克。

林鹏在分享中多次提及,目前许多防护手段都还比较原始,需要人工确认。那真正现代化的手段是什么呢?他认为是“用户行为监控体系”。用户、时间、环 境、操作行为、操作对象等构成一位用户的一次行为指纹,用户换操作系统、浏览器、IP地址、非常见时间投资、非常见链接操作等等都会被记录下来。通过上述 用户画像,在P2P平台很常见的黄牛党,就可以在用户验真环节被有效鉴别。

如果你对如何打击黄牛党感兴趣,或者想了解用户行为监控体系如何确定宽与严的界限,2015年7月17~18日,台湾著名黑客大会HITCON将来到北京,与著名漏洞平台乌云网联合举办乌云第二届“白帽子大会”。 这是HITCON会议在创办十年来第一次走出台湾。另外,本次乌云白帽子大会还引入了HITCON的经典环节“算命摊”。台湾和大陆的两代知名黑客与资深 信息安全人士如HITCON创始人TT、乌云创始人剑心、Ucloud创始人季昕华、阿里巴巴副总裁杜跃进博士、IBM安全系统首席架构师李承达等,以及 知名自媒体人池建强将参与该活动

Linux内核中的IO调度器

来源:http://blog.chinaunix.net/uid-16979052-id-3484476.html

Linux内核2.6开始引入了全新的IO调度子系统。Linux内核提供了CFQ(默认), deadlinenoop三种IO调度器。本文首先介绍三种IO调度器各自的特点和应用场景,之后会介绍Linux内核提供的为每一个块设备指定IO调度器和调整IO调度器参数的接口。

  1. CFQ(Complete Fair Queuing)完全公平的排队

CFQ实现了一种QoSIO调度算法。该算法为每一个进程分配一个时间窗口,在该时间窗口内,允许进程发出IO请求。通过时间窗口在不同进程间的移动,保证了对于所有进程而言都有公平的发出IO请求的机会。同时CFQ也实现了进程的优先级控制,可保证高优先级进程可以获得更长的时间窗口。

CFQ适用于系统中存在多任务I/O请求的情况,通过在多进程中轮换,保证了系统I/O请求整体的低延迟。但是,对于只有少数进程存在大量密集的I/O请求的情况,会出现明显的I/O性能下降。

Linux系统中可以通过cat /sys/block/.../queue/scheduler进行查看。

root@src-yinzh:~$cat /sys/block/sda/queue/scheduler
noop deadline [cfq]

可以使用echo 调度算法 > /sys/block/磁盘名/queue/scheduler进行修改磁盘IO调度算法时。

root@src-yinzh:~$echo "noop" > /sys/block/sda/queue/schedulerroot@src-yinzh:~$cat /sys/block/sda/queue/scheduler
[noop] deadline cfq
  1. CFQ调度器主要提供如下参数

root@src-yinzh:~$ls /sys/block/sda/queue/iosched/
back_seek_max fifo_expire_async group_idle quantum slice_async_rq slice_syncback_seek_penalty fifo_expire_sync low_latency slice_async slice_idle

slice_idle:如果一个进程在自己的时间窗口里,经过slice_idle时间都没有发射I/O请求,则调度选择下一个程序。

Quantum:该参数控制在一个时间窗口内可以发射的I/O请求的最大数目。

low_latency:对于I/O请求延时非常重要的任务,可以打开低延迟模式来降低I/O请求的延时。

  1. NOOP调度算法

NOOP调度器十分简单,其只拥有一个等待队列,每当来一个新的请求,仅仅是按先来先处理的思路将请求插入到等待队列的尾部。

其应用环境主要有以下两种:一是物理设备中包含了自己的I/O调度程序,比如SCSITCQ;二是寻道时间可以忽略不计的设备,比如SSD等。

  1. DEADLINE调度算法

DEADLINE调度算法主要针对I/O请求的延时而设计,每个I/O请求都被附加一个最后执行期限。该算法维护两类队列,一是按照扇区排序的读写请求队列;二是按照过期时间排序的读写请求队列。如果当前没有I/O请求过期,则会按照扇区顺序执行I/O请求;如果发现过期的I/O请求,则会处理按照过期时间排序的队列,直到所有过期请求都被发射为止。在处理请求时,该算法会优先考虑读请求。

当系统中存在的I/O请求进程数量比较少时,与CFQ算法相比,DEADLINE算法可以提供较高的I/O吞吐率。

  1. DEADLINE调度算法提供如下参数
root@src-yinzh:~$ls /sys/block/sda/queue/iosched/
fifo_batch front_merges read_expire write_expire writes_starved

writes_starved:该参数控制当读写队列均不为空时,发射多少个读请求后,允许发射写请求。

read_expire:参数控制读请求的过期时间,单位毫秒。

write_expire:参数控制写请求的过期时间,单位毫秒。

 

Linux I/O调度 整理

来源:http://www.cnblogs.com/sopc-mc/archive/2011/10/09/2204858.html

一) I/O调度程序的总结

1) 当向设备写入数据块或是从设备读出数据块时,请求都被安置在一个队列中等待完成.
2) 每个块设备都有它自己的队列.
3) I/O调度程序负责维护这些队列的顺序,以更有效地利用介质.I/O调度程序将无序的I/O操作变为有序的I/O操作.
4) 内核必须首先确定队列中一共有多少个请求,然后才开始进行调度.

 

二) I/O调度的4种算法

1) CFQ(Completely Fair Queuing, 完全公平排队)

特点:
在最新的内核版本和发行版中,都选择CFQ做为默认的I/O调度器,对于通用的服务器也是最好的选择.
CFQ试图均匀地分布对I/O带宽的访问,避免进程被饿死并实现较低的延迟,是deadline和as调度器的折中.
CFQ对于多媒体应用(video,audio)和桌面系统是最好的选择.
CFQ赋予I/O请求一个优先级,而I/O优先级请求独立于进程优先级,高优先级进程的读写不能自动地继承高的I/O优先级.

工作原理:
CFQ为每个进程/线程单独创建一个队列来管理该进程所产生的请求,也就是说每个进程一个队列,各队列之间的调度使用时间片来调度,以此来保证每个进程都能被很好的分配到I/O带宽.I/O调度器每次执行一个进程的4次请求.
2) NOOP(电梯式调度程序)

特点:
在Linux2.4或更早的版本的调度程序,那时只有这一种I/O调度算法.
NOOP实现了一个FIFO队列,它像电梯的工作主法一样对I/O请求进行组织,当有一个新的请求到来时,它将请求合并到最近的请求之后,以此来保证请求同一介质.
NOOP倾向饿死读而利于写.
NOOP对于闪存设备,RAM,嵌入式系统是最好的选择.

电梯算法饿死读请求的解释:
因为写请求比读请求更容易.
写请求通过文件系统cache,不需要等一次写完成,就可以开始下一次写操作,写请求通过合并,堆积到I/O队列中.
读请求需要等到它前面所有的读操作完成,才能进行下一次读操作.在读操作之间有几毫秒时间,而写请求在这之间就到来,饿死了后面的读请求.

 

3) Deadline(截止时间调度程序)

特点:
通过时间以及硬盘区域进行分类,这个分类和合并要求类似于noop的调度程序.
Deadline确保了在一个截止时间内服务请求,这个截止时间是可调整的,而默认读期限短于写期限.这样就防止了写操作因为不能被读取而饿死的现象.
Deadline对数据库环境(ORACLE RAC,MYSQL等)是最好的选择.
4) AS(预料I/O调度程序)

特点:
本质上与Deadline一样,但在最后一次读操作后,要等待6ms,才能继续进行对其它I/O请求进行调度.
可以从应用程序中预订一个新的读请求,改进读操作的执行,但以一些写操作为代价.
它会在每个6ms中插入新的I/O操作,而会将一些小写入流合并成一个大写入流,用写入延时换取最大的写入吞吐量.
AS适合于写入较多的环境,比如文件服务器
AS对数据库环境表现很差.

 

三) I/O调度方法的查看与设置

1) 查看当前系统的I/O调度

[root@test1 tmp]# cat /sys/block/sda/queue/scheduler
noop anticipatory deadline [cfq]

2) 临时更改I/O调度
例如:想更改到noop电梯调度算法:
echo noop > /sys/block/sda/queue/scheduler

3) 永久更改I/O调度
修改内核引导参数,加入elevator=调度程序名
[root@test1 tmp]# vi /boot/grub/menu.lst
更改到如下内容:
kernel /boot/vmlinuz-2.6.18-8.el5 ro root=LABEL=/ elevator=deadline rhgb quiet

重启之后,查看调度方法:
[root@test1 ~]# cat /sys/block/sda/queue/scheduler
noop anticipatory [deadline] cfq
已经是deadline了
四) I/O调度程序的测试

本次测试分为只读,只写,读写同时进行,分别对单个文件600MB,每次读写2M,共读写300次.

1) 测试磁盘读
[root@test1 tmp]# echo deadline > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/sda1 f=/dev/null bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 6.81189 seconds, 92.4 MB/s

real    0m6.833s
user    0m0.001s
sys     0m4.556s
[root@test1 tmp]# echo noop > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/sda1 f=/dev/null bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 6.61902 seconds, 95.1 MB/s

real    0m6.645s
user    0m0.002s
sys     0m4.540s
[root@test1 tmp]# echo anticipatory > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/sda1 f=/dev/null bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 8.00389 seconds, 78.6 MB/s

real    0m8.021s
user    0m0.002s
sys     0m4.586s
[root@test1 tmp]# echo cfq > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/sda1 f=/dev/null bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 29.8 seconds, 21.1 MB/s

real    0m29.826s
user    0m0.002s
sys     0m28.606s

结果:
第一 noop:用了6.61902秒,速度为95.1MB/s
第二 deadline:用了6.81189秒,速度为92.4MB/s
第三 anticipatory:用了8.00389秒,速度为78.6MB/s
第四 cfq:用了29.8秒,速度为21.1MB/s
2) 测试写磁盘
[root@test1 tmp]# echo cfq > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/zero f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 6.93058 seconds, 90.8 MB/s

real    0m7.002s
user    0m0.001s
sys     0m3.525s

[root@test1 tmp]# echo anticipatory > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/zero f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 6.79441 seconds, 92.6 MB/s

real    0m6.964s
user    0m0.003s
sys     0m3.489s

[root@test1 tmp]# echo noop > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/zero f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 9.49418 seconds, 66.3 MB/s

real    0m9.855s
user    0m0.002s
sys     0m4.075s

[root@test1 tmp]# echo deadline > /sys/block/sda/queue/scheduler
[root@test1 tmp]# time dd if=/dev/zero f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 6.84128 seconds, 92.0 MB/s

real    0m6.937s
user    0m0.002s
sys     0m3.447s

测试结果:
第一 anticipatory,用了6.79441秒,速度为92.6MB/s
第二 deadline,用了6.84128秒,速度为92.0MB/s
第三 cfq,用了6.93058秒,速度为90.8MB/s
第四 noop,用了9.49418秒,速度为66.3MB/s
3) 测试同时读/写

[root@test1 tmp]# echo deadline > /sys/block/sda/queue/scheduler
[root@test1 tmp]# dd if=/dev/sda1 f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 15.1331 seconds, 41.6 MB/s

[root@test1 tmp]# echo cfq > /sys/block/sda/queue/scheduler
[root@test1 tmp]# dd if=/dev/sda1 f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 36.9544 seconds, 17.0 MB/s

[root@test1 tmp]# echo anticipatory > /sys/block/sda/queue/scheduler
[root@test1 tmp]# dd if=/dev/sda1 f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 23.3617 seconds, 26.9 MB/s

[root@test1 tmp]# echo noop > /sys/block/sda/queue/scheduler
[root@test1 tmp]# dd if=/dev/sda1 f=/tmp/test bs=2M count=300
300+0 records in
300+0 records out
629145600 bytes (629 MB) copied, 17.508 seconds, 35.9 MB/s

测试结果:
第一 deadline,用了15.1331秒,速度为41.6MB/s
第二 noop,用了17.508秒,速度为35.9MB/s
第三 anticipatory,用了23.3617秒,速度为26.9MS/s
第四 cfq,用了36.9544秒,速度为17.0MB/s

 

五) ionice

ionice可以更改任务的类型和优先级,不过只有cfq调度程序可以用ionice.
有三个例子说明ionice的功能:
采用cfq的实时调度,优先级为7
ionice -c1 -n7  -ptime dd if=/dev/sda1 f=/tmp/test bs=2M count=300&
采用缺省的磁盘I/O调度,优先级为3
ionice -c2 -n3  -ptime dd if=/dev/sda1 f=/tmp/test bs=2M count=300&
采用空闲的磁盘调度,优先级为0
ionice -c3 -n0  -ptime dd if=/dev/sda1 f=/tmp/test bs=2M count=300&

ionice的三种调度方法,实时调度最高,其次是缺省的I/O调度,最后是空闲的磁盘调度.
ionice的磁盘调度优先级有8种,最高是0,最低是7.
注意,磁盘调度的优先级与进程nice的优先级没有关系.
一个是针对进程I/O的优先级,一个是针对进程CPU的优先级.

Bootstrap后台管理模板集合

Bootstrap是Twitter推出的一个用于前端开发的开源工具包。是目前最受欢迎的前端框架之一。下面为大家推荐17个免费的Bootstrap后台管理界面模板。

1. Admin Lite

bootstrap模板 bootstrap后台模板 Bootstrap教程

AdminLTE - 是一个完全响应式管理模板。基于Bootstrap3的框架。高度可定制的,易于使用。支持很多的屏幕分辨率适合从小型移动设备到大型台式机。

参考地址: https://www.almsaeedstudio.com/themes/AdminLTE/index2.html

体验了,一下感觉很好!!!

https://github.com/almasaeed2010/AdminLTE

 

2. Dashboard Sidebar

bootstrap模板 bootstrap后台模板 Bootstrap教程

这是一个管理后台模板,拥有控制面板或仪表板。这个模板有一个可折叠的工具栏菜单,并且有可以作为一个数据网格的表格。

参考地址:http://www.bootstrapzero.com/bootstrap-template/dashboard-sidebar

没太看懂!

3. DevOOPS

bootstrap模板 bootstrap后台模板 Bootstrap教程

DevOOPS是一个自适应免费管理仪表板主题,可以用在你的web项目中。

参考下载地址:https://github.com/devoopsme/devoops/archive/master.zip

体验一下, 很好, 比较轻量级, 很好很好!

4. Blocks

bootstrap模板 bootstrap后台模板 Bootstrap教程

Blocks 是一个轻量级的管理仪表板模板,源于 Cyfe 。 Blocks实质上是一个单页模板,所有你需要的,配置都并排放在一个页面中。

参考地址:http://www.bootstrapzero.com/bootstrap-template/blocks

5. DashGum

bootstrap模板 bootstrap后台模板 Bootstrap教程

DashGum 一个很棒15页的Bootstrap管理主题或仪表板。它拥有图表,表格,很多面板,日历,通知,待办列表等功能。

 

6. Metro Dashboard

bootstrap模板 bootstrap后台模板 Bootstrap教程

Metro风格的管理员仪表板模板

7. Bcore

bootstrap模板 bootstrap后台模板 Bootstrap教程

BCORE是一个完全自适应的管理后台模板。它配备了40个集成的插件包括图表,数据表格,地图,表单元素,通知等等。

还是停留在 2014年

8. Metis

bootstrap模板 bootstrap后台模板 Bootstrap教程

基于Twitter Bootstrap 3.x的免费管理模板

9. Clean

bootstrap模板 bootstrap后台模板 Bootstrap教程

Bootstrap主题拥有示例和不在bootstrap包中的附加UI组件。固定的页脚,管理员面板,shadow box等。

10. Charisma

bootstrap模板 bootstrap后台模板 Bootstrap教程

Charisma 是一个功能齐全,免费,高质量,响应式,HTML5管理后面模板,基于Bootstrap 3。拥有9种不同的主题。

 

11. Responsive Dashboard

bootstrap模板 bootstrap后台模板 Bootstrap教程

这个模板演示了 AngularJS 与 Bootstrap 3 如何集成使用。这个漂亮免费仪表板模板采用了可折叠的垂直侧边栏,字体图标,警报和滚动表格。

12. Bootstrap Admin Theme

bootstrap模板 bootstrap后台模板 Bootstrap教程

一个通用的管理Bootstrap 主题,个人可免费使用和商业用途。

13. SB Admin 2

bootstrap模板 bootstrap后台模板 Bootstrap教程

SB Admin 2 Bootstrap管理主题,仪表板模板或Web应用程序的用户界面启动。主题提供各种定制的jQuery插件为内置的Bootstrap UI添加扩展功能。

14. Hierapolis

bootstrap模板 bootstrap后台模板 Bootstrap教程

Bootstrap 3 是一个扁平风格的管理主题

 

15. Binary

bootstrap模板 bootstrap后台模板 Bootstrap教程

Binary Admin 可定制管理后面或仪表板模板。这响应模板包括可折叠侧栏,工作图表,数据表,多层级的下拉。

16. TemplateVamp

bootstrap模板 bootstrap后台模板 Bootstrap教程

TemplateVamp是一个功能强大和可定制的管理后台模板。用到了HTML5 & CSS3的强大和灵活性,它可以作为一个前端工具包,用于快速开发Web应用程序,轻量级,速度快,而且适合于移动网站;

https://www.egrappler.com/bootstrap-responsive-admin-template/index.html

17. Meritoo  无法连接了

bootstrap模板 bootstrap后台模板 Bootstrap教程

一个利用 Bootstrap 3.x 构建的管理后台主题,免费个人和商用。

 

来源: http://www.chinaz.com/web/2015/0428/401898_4.shtml

Kafka、Storm、HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实 时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处 理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系 统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合 Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方 式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

  • 直接使用Storm的Topology对数据进行实时分析处理
  • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合 Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软 件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

192.168.4.142   h1
192.168.4.143   h2
192.168.4.144   h3

在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar xvzf kafka_2.9.2-0.8.1.1.tgz
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
broker.id=0
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果 你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:

zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
cd /usr/local/zookeeper
bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:
create /kafka ''
这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
broker.id=1  # 在h1修改
broker.id=2  # 在h2修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的Topic,执行如下命令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:
Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1
上面Leader、Replicas、Isr的含义如下:
Partition: 分区
Leader   : 负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

 

 

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

192.168.4.142 h1
192.168.4.143 h2
192.168.4.144 h3

首先,在h1节点上,执行如下命令安装:
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
tar xvzf apache-storm-0.9.2-incubating.tar.gz
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

然后,修改配置文件conf/storm.yaml,内容如下所示:
storm.zookeeper.servers:
- "h1"
- "h2"
- "h3"
storm.zookeeper.port: 2181
#
nimbus.host: "h1"

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

storm.local.dir: "/tmp/storm"

将配置好的安装文件,分发到其他节点上:
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
bin/storm nimbus &
bin/storm supervisor &

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
bin/storm ui &

这样可以通过访问http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序 Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际 上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm- kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
package org.shirdrn.storm.examples;import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MyKafkaTopology {

public static class KafkaWordSplitter extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0);
LOG.info("RECV[kafka -> splitter] " + line);
String[] words = line.split("\\s+");
for(String word : words) {
LOG.info("EMIT[splitter -> counter] " + word);
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public static class WordCounter extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(WordCounter.class);
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
}

@Override
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
LOG.info("RECV[splitter -> counter] " + word + " : " + count);
AtomicInteger ai = this.counterMap.get(word);
if(ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
LOG.info("CHECK statistics map: " + this.counterMap);
}

@Override
public void cleanup() {
LOG.info("The final result:");
Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
while(iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";

BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));

Config conf = new Config();

String name = MyKafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
// Nimbus host name passed from command line
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}

上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程 序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

然后,就可以提交我们开发的Topology程序了:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
spoutConf.forceFromStart = false;

该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读 取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的 Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
package org.shirdrn.storm.examples;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class StormToHDFSTopology {

public static class EventSpout extends BaseRichSpout {

private static final Log LOG = LogFactory.getLog(EventSpout.class);
private static final long serialVersionUID = 886149197481637894L;
private SpoutOutputCollector collector;
private Random rand;
private String[] records;

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
records = new String[] {
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
};
}

@Override
public void nextTuple() {
Utils.sleep(1000);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
Date d = new Date(System.currentTimeMillis());
String minute = df.format(d);
String record = records[rand.nextInt(records.length)];
LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
collector.emit(new Values(minute, record));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("minute", "record"));
}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(" : ");

// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// rotate files
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log");

HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("event-spout", new EventSpout(), 3);
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));

Config conf = new Config();

String name = StormToHDFSTopology.class.getSimpleName();
if (args != null && args.length > 0) {
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}

}

上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、 FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大 小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

 

 

整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在 Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消 费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
package org.shirdrn.storm.examples;

import java.util.Arrays;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DistributeWordTopology {

public static class KafkaWordToUpperCase extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -5207232012035109026L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("RECV[kafka -> splitter] " + line);
if(!line.isEmpty()) {
String upperLine = line.toUpperCase();
LOG.info("EMIT[splitter -> counter] " + upperLine);
collector.emit(input, new Values(upperLine, upperLine.length()));
}
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line", "len"));
}

}

public static class RealtimeBolt extends BaseRichBolt {

private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
private static final long serialVersionUID = -4115132557403913367L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0).trim();
LOG.info("REALTIME: " + line);
collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {

// Configure Kafka
String zks = "h1:2181,h2:2181,h3:2181";
String topic = "my-replicated-topic5";
String zkRoot = "/storm"; // default zookeeper root configuration for storm
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
spoutConf.zkPort = 2181;

// Configure HDFS bolt
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://h1:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

// configure & build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");

// submit topology
Config conf = new Config();
String name = DistributeWordTopology.class.getSimpleName();
if (args != null && args.length > 0) {
String nimbus = args[0];
conf.put(Config.NIMBUS_HOST, nimbus);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}

}

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

什么是java的内存泄漏

来源:互联网

Java最显著的优势之一就是它的内存管理机制。你只需简单创建对象,然后Java垃圾回收机制便会小心的分配和释放内存。然而,事实并非如此简单,因为在Java应用程序中经常发生内存泄漏。

本教程说明了什么是内存泄漏,为什么会发生,以及如何防止它们。

1.什么是内存泄漏?

内存泄漏的定义: 对象不再被应用程序使用,但是垃圾回收器却不能移除它们,因为它们正在被引用。

要理解这个定义,我们需要理解对象在内存中的状态,下图说明了哪些是未被使用的以及哪些是未被引用的。

从图中可以看到被引用的对象和未被引用的对象。未被引用的对象将会被垃圾回收器回收,而被引用对象则不会被回收。未被引用的对象理所当然是未被使用的,因为没有其他的对象引用它。然而,未被使用的对象并不一定是未被引用的,其中一些是被引用的。这就是内存泄漏的起因。

2.为什么会发生内存泄漏?

让我们来看看下面这个例子,看看为什么内存泄漏会发生。在如下例子中,对象A引用了对象B。A的生命周期(t1-t4)要比B的生命周期(t2- t3)长很多。当B不再用于应用中时,A仍然持有对它的引用。在这种方式下,垃圾回收器就不能将B从内存中移除。这将可能导致出现内存不足的问题,因为如 果A对更多的对象做同样的事情,那么内存中将会有很多无法被回收的对象,这将极度耗费内存空间。

也有可能B持有大量对其他对象的引用,这些被B引用的对象也不能够被回收。所有这些未被使用的对象将会耗费宝贵的内存空间。

3.如何阻止内存泄漏?

以下是一些阻止内存泄漏的快速动手技巧。

(1)注意集合类,例如HashMap,ArrayList,等等。因为它们是内存泄漏经常发生的地方。当它们被声明为静态时,它们的生命周期就同应用程序的生命周期一般长。

(2)注意事件监听器和回调,如果一个监听器已经注册,但是当这个类不再被使用时却未被注销,就会发生内存泄漏。

(3)"如果一个类管理它自己的内存,程序员应该对内存泄漏保持警惕。"[1] 很多时候当一个对象的成员变量指向其他对象时,不再使用时需要被置为null。

4.一个小测验:为什么在JDK6中substring()方法会引起内存泄漏?

为了回答这个问题,您可能需要阅读JDK6和7中的substring()