月度归档:2017年07月

基于Docker搭建多节点Hadoop集群

GitHub地址:

  • kiwenlau/hadoop-cluster-docker

本文介绍了基于Docker在单机上搭建多节点Hadopp集群方法,Hadoop的Master和Slave分别运行在不同容器中。

可以直接进入第三部分,快速在本机搭建一个3个节点的Hadoop集群

一. 项目简介

直接用机器搭建Hadoop集群是一个相当痛苦的过程,尤其对初学者来说。他们还没开始跑wordcount,可能就被这个问题折腾的体无完肤了。

我的目标是将Hadoop集群运行在Docker容器中,使Hadoop开发者能够快速便捷地在本机搭建多节点的Hadoop集群。其实这个想法已经有了不少实现,但是都不是很理想,他们或者镜像太大,或者使用太慢,或者使用了第三方工具使得使用起来过于复杂…下表为一些已知的Hadoop on Docker项目以及其存在的问题。

项目镜像大小问题
sequenceiq/hadoop-docker:latest1.491GB镜像太大,只支持单个节点
sequenceiq/hadoop-docker:2.7.01.76 GB同上
sequenceiq/hadoop-docker:2.6.01.624GB同上
sequenceiq/ambari:latest1.782GB镜像太大,使用太慢
sequenceiq/ambari:2.0.04.804GB同上
sequenceiq/ambari:latest:1.704.761GB同上
alvinhenrick/hadoop-mutinode4.331GB镜像太大,构建太慢,增加节点麻烦,有bug

我的项目参考了alvinhenrick/hadoop-mutinode项目,不过我做了大量的优化和重构。alvinhenrick/hadoop-mutinode项目的Github主页以及作者所写的博客地址:

下面两个表是alvinhenrick/hadoop-mutinode项目与我的kiwenlau/hadoop-cluster-docker项目的参数对比

镜像名称构建时间镜像层数镜像大小
alvinhenrick/serf258.213s21239.4MB
alvinhenrick/hadoop-base2236.055s584.328GB
alvinhenrick/hadoop-dn51.959s744.331GB
alvinhenrick/hadoop-nn-dn49.548s844.331GB
镜像名称构建时间镜像层数镜像大小
kiwenlau/serf-dnsmasq509.46s8206.6 MB
kiwenlau/hadoop-base400.29s7775.4 MB
kiwenlau/hadoop-master5.41s9775.4 MB
kiwenlau/hadoop-slave2.41s8775.4 MB

可知,我主要优化了这样几点

  • 更小的镜像大小
  • 更快的构造时间
  • 更少的镜像层数

更快更方便地改变Hadoop集群节点数目

另外,alvinhenrick/hadoop-mutinode项目增加节点时需要手动修改Hadoop配置文件然后重新构建hadoop-nn-dn镜像,然后修改容器启动脚本,才能实现增加节点的功能。而我通过shell脚本实现自动话,不到1分钟可以重新构建hadoop-master镜像,然后立即运行!!!本项目默认启动3个节点的Hadoop集群,支持任意节点数的hadoop集群。

另外,启动hadoop, 运行wordcount以及重新构建镜像都采用了shell脚本实现自动化。这样使得整个项目的使用以及开发都变得非常方便快捷:)

开发测试环境

  • 操作系统:ubuntu 14.04 和 ubuntu 12.04
  • 内核版本: 3.13.0-32-generic
  • Docker版本:1.5.0 和1.6.2

硬盘不够,内存不够,尤其是内核版本过低会导致运行失败:(

二. 镜像简介

本项目一共开发了4个镜像

  • serf-dnsmasq
  • hadoop-base
  • hadoop-master
  • hadoop-slave

serf-dnsmasq镜像

  • 基于ubuntu:15.04 (选它是因为它最小,不是因为它最新…)
  • 安装serf: serf是一个分布式的机器节点管理工具。它可以动态地发现所有hadoop集群节点。
  • 安装dnsmasq: dnsmasq作为轻量级的dns服务器。它可以为hadoop集群提供域名解析服务。

容器启动时,master节点的IP会传给所有slave节点。serf会在container启动后立即启动。slave节点上的serf agent会马上发现master节点(master IP它们都知道嘛),master节点就马上发现了所有slave节点。然后它们之间通过互相交换信息,所有节点就能知道其他所有节点的存在了!(Everyone will know Everyone). serf发现新的节点时,就会重新配置dnsmasq,然后重启dnsmasq. 所以dnsmasq就能够解析集群的所有节点的域名啦。这个过程随着节点的增加会耗时更久,因此,若配置的Hadoop节点比较多,则在启动容器后需要测试serf是否发现了所有节点,dns是否能够解析所有节点域名。稍等片刻才能启动Hadoop。这个解决方案是由SequenceIQ公司提出的,该公司专注于将Hadoop运行在Docker中。请参考这个PPT:Docker-based Hadoop Provisioning

hadoop-base镜像

  • 基于serf-dnsmasq镜像
  • 安装JDK(openjdk)
  • 安装openssh-server, 配置无密码ssh
  • 安装vim:介样就可以愉快地在容器中敲代码了:)
  • 安装Hadoop 2.3.0: 安装编译过的hadoop (2.5.2, 2.6.0, 2.7.0 都比2.3.0大,所以我懒得升级了)

编译Hadoop的步骤请参考我的博客:Hadoop 2.30 在Ubuntu 14.04 中编译

如果需要重新开发我的hadoop-base, 需要下载编译过的hadoop-2.3.0安装包,放到hadoop-cluster-docker/hadoop-base/files目录内。我编译的64位hadoop-2.3.0下载地址:hadoop-2.3.0

另外,我还编译了64位的hadoop 2.5.2, 2.6.0, 2.7.0, 其下载地址如下:

  • hadoop-2.3.0
  • hadoop-2.5.2
  • hadoop-2.6.0
  • hadoop-2.7.0

hadoop-master镜像

  • 基于hadoop-base镜像
  • 配置hadoop的master节点
  • 格式化namenode

这一步需要配置slaves文件,而slaves文件需要列出所有节点的域名或者IP。因此,Hadoop节点数目不同时,slaves文件自然也不一样。因此,更改Hadoop集群节点数目时,需要修改slaves文件然后重新构建hadoop-master镜像。我编写了一个resize-cluster.sh脚本自动化这一过程。仅需给定节点数目作为脚本参数就可以轻松实现Hadoop集群节点数目的更改。由于hadoop-master镜像仅仅做一些配置工作,也无需下载任何文件,整个过程非常快,1分钟就足够了。

hadoop-slave镜像

  • 基于hadoop-base镜像
  • 配置hadoop的slave节点

镜像大小分析

下表为sudo docker images的运行结果

REPOSITORYTAGIMAGE IDCREATEDVIRTUAL SIZE
index.alauda.cn/kiwenlau/hadoop-slave0.1.0d63869855c0317 hours ago777.4 MB
index.alauda.cn/kiwenlau/hadoop-master0.1.07c9d32ede45017 hours ago777.4 MB
index.alauda.cn/kiwenlau/hadoop-base0.1.05571bd5de58e17 hours ago777.4 MB
index.alauda.cn/kiwenlau/serf-dnsmasq0.1.009ed89c24ee817 hours ago206.7 MB
ubuntu15.04bd94ae5874833 weeks ago131.3 MB

易知以下几个结论:

  • serf-dnsmasq镜像在ubuntu:15.04镜像的基础上增加了75.4MB
  • hadoop-base镜像在serf-dnsmasq镜像的基础上增加了570.7MB
  • hadoop-master和hadoop-slave镜像在hadoop-base镜像的基础上大小几乎没有增加

下表为docker history index.alauda.cn/kiwenlau/hadoop-base:0.1.0命令的部分运行结果

IMAGECREATEDCREATED BYSIZE
2039b9b8114644 hours ago/bin/sh -c #(nop) ADD multi:a93c971a49514e787158.5 MB
cdb620312f3044 hours ago/bin/sh -c apt-get install -y openjdk-7-jdk324.6 MB
da7d10c790c144 hours ago/bin/sh -c apt-get install -y openssh-server87.58 MB
c65cb568defc44 hours ago/bin/sh -c curl -Lso serf.zip https://dl.bint14.46 MB
3e22b3d72e3344 hours ago/bin/sh -c apt-get update && apt-get install60.89 MB
b68f8c8d21403 weeks ago/bin/sh -c #(nop) ADD file:d90f7467c470bfa9a3131.3 MB

可知

  • 基础镜像ubuntu:15.04为131.3MB
  • 安装openjdk需要324.6MB
  • 安装hadoop需要158.5MB
  • ubuntu,openjdk与hadoop均为镜像所必须,三者一共占了:614.4MB

因此,我所开发的hadoop镜像以及接近最小,优化空间已经很小了

下图显示了项目的Docker镜像结构:

hadoop-docker

三. 3节点Hadoop集群搭建步骤

1. 拉取镜像

sudo docker pull index.alauda.cn/kiwenlau/hadoop-master:0.1.0
sudo docker pull index.alauda.cn/kiwenlau/hadoop-slave:0.1.0
sudo docker pull index.alauda.cn/kiwenlau/hadoop-base:0.1.0
sudo docker pull index.alauda.cn/kiwenlau/serf-dnsmasq:0.1.0
  • 3~5分钟OK~

查看下载的镜像

sudo docker images

运行结果

REPOSITORYTAGIMAGE IDCREATEDVIRTUAL SIZE
index.alauda.cn/kiwenlau/hadoop-slave0.1.0d63869855c0317 hours ago777.4 MB
index.alauda.cn/kiwenlau/hadoop-master0.1.07c9d32ede45017 hours ago777.4 MB
index.alauda.cn/kiwenlau/hadoop-base0.1.05571bd5de58e17 hours ago777.4 MB
index.alauda.cn/kiwenlau/serf-dnsmasq0.1.009ed89c24ee817 hours ago206.7 MB
  • hadoop-base镜像是基于serf-dnsmasq镜像的,hadoop-slave镜像和hadoop-master镜像都是基于hadoop-base镜像
  • 所以其实4个镜像一共也就777.4MB:)

2. 修改镜像tag

sudo docker tag d63869855c03 kiwenlau/hadoop-slave:0.1.0
sudo docker tag 7c9d32ede450 kiwenlau/hadoop-master:0.1.0
sudo docker tag 5571bd5de58e kiwenlau/hadoop-base:0.1.0
sudo docker tag 09ed89c24ee8 kiwenlau/serf-dnsmasq:0.1.0

查看修改tag后镜像

sudo docker images

运行结果

REPOSITORYTAGIMAGE IDCREATEDVIRTUAL SIZE
index.alauda.cn/kiwenlau/hadoop-slave0.1.0d63869855c0317 hours ago777.4 MB
kiwenlau/hadoop-slave0.1.0d63869855c0317 hours ago777.4 MB
index.alauda.cn/kiwenlau/hadoop-master0.1.07c9d32ede45017 hours ago777.4 MB
kiwenlau/hadoop-master0.1.07c9d32ede45017 hours ago777.4 MB
kiwenlau/hadoop-base0.1.05571bd5de58e17 hours ago777.4 MB
index.alauda.cn/kiwenlau/hadoop-base0.1.05571bd5de58e17 hours ago777.4 MB
kiwenlau/serf-dnsmasq0.1.009ed89c24ee817 hours ago206.7 MB
index.alauda.cn/kiwenlau/serf-dnsmasq0.1.009ed89c24ee817 hours ago206.7 MB
  • 之所以要修改镜像,是因为我默认是将镜像上传到Dockerhub, 因此Dokerfile以及shell脚本中得镜像名称都是没有alauada前缀的,sorry for this….不过改tag还是很快滴
  • 若直接下载我在DockerHub中的镜像,自然就不需要修改tag…不过Alauda镜像下载速度很快的哈~

3.下载源代码

git clone https://github.com/kiwenlau/hadoop-cluster-docker
  • 为了防止Github被XX, 我把代码导入到了开源中国的git仓库
git clone http://git.oschina.net/kiwenlau/hadoop-cluster-docker

4. 运行容器

cd hadoop-cluster-docker
./start-container.sh

运行结果

start master container...
start slave1 container...
start slave2 container...
root@master:~#
  • 一共开启了3个容器,1个master, 2个slave
  • 开启容器后就进入了master容器root用户的家目录(/root)

查看master的root用户家目录的文件

ls

运行结果

hdfs run-wordcount.sh serf_log start-hadoop.sh start-ssh-serf.sh
  • start-hadoop.sh是开启hadoop的shell脚本
  • run-wordcount.sh是运行wordcount的shell脚本,可以测试镜像是否正常工作

5.测试容器是否正常启动(此时已进入master容器)

查看hadoop集群成员

serf members

运行结果

master.kiwenlau.com 172.17.0.65:7946 alive
slave1.kiwenlau.com 172.17.0.66:7946 alive
slave2.kiwenlau.com 172.17.0.67:7946 alive
  • 若结果缺少节点,可以稍等片刻,再执行“serf members”命令。因为serf agent需要时间发现所有节点。

测试ssh

ssh slave2.kiwenlau.com

运行结果

Warning: Permanently added 'slave2.kiwenlau.com,172.17.0.67' (ECDSA) to the list of known hosts.
Welcome to Ubuntu 15.04 (GNU/Linux 3.13.0-53-generic x86_64)
* Documentation: https://help.ubuntu.com/
The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.
Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.
root@slave2:~#

退出slave2

exit

运行结果

logout
Connection to slave2.kiwenlau.com closed.
  • 若ssh失败,请稍等片刻再测试,因为dnsmasq的dns服务器启动需要时间。
  • 测试成功后,就可以开启Hadoop集群了!其实你也可以不进行测试,开启容器后耐心等待一分钟即可!

6. 开启hadoop

./start-hadoop.sh
  • 上一步ssh到slave2之后,请记得回到master啊!!!
  • 运行结果太多,忽略….
  • hadoop的启动速度取决于机器性能….

7. 运行wordcount

./run-wordcount.sh

运行结果

input file1.txt:
Hello Hadoop
input file2.txt:
Hello Docker
wordcount output:
Docker 1
Hadoop 1
Hello 2
  • wordcount的执行速度取决于机器性能….

四. N节点Hadoop集群搭建步骤

1. 准备工作

  • 参考第二部分1~3:下载镜像,修改tag,下载源代码
  • 注意,你可以不下载serf-dnsmasq, 但是请最好下载hadoop-base,因为hadoop-master是基于hadoop-base构建的

2. 重新构建hadoop-master镜像

./resize-cluster.sh 5
  • 不要担心,1分钟就能搞定
  • 你可以为resize-cluster.sh脚本设不同的正整数作为参数数1, 2, 3, 4, 5, 6…

3. 启动容器

./start-container.sh 5
  • 你可以为resize-cluster.sh脚本设不同的正整数作为参数数1, 2, 3, 4, 5, 6…
  • 这个参数呢,最好还是得和上一步的参数一致:)
  • 这个参数如果比上一步的参数大,你多启动的节点,Hadoop不认识它们..
  • 这个参数如果比上一步的参数小,Hadoop觉得少启动的节点挂掉了..

4. 测试工作

  • 参考第三部分5~7:测试容器,开启Hadoop,运行wordcount
  • 请注意,若节点增加,请务必先测试容器,然后再开启Hadoop, 因为serf可能还没有发现所有节点,而dnsmasq的DNS服务器表示还没有配置好服务
  • 测试等待时间取决于机器性能….

UEFI引导系统

现在的电脑大多数使用了UEFI引导系统(原来都是使用BIOS),从而加快启动速度,但同时也给用惯BIOS的用户带来很多困惑!为啥电脑不能识别制作好的u盘PE系统?
810a19d8bc3eb135591dc3dca51ea8d3fc1f44fa

  • 传统的电脑通常都是使用BIOS引导,开机BIOS初始化,然后BIOS自检,再引导操作系统→进入系统,显示桌面。

    UEFI引导系统
  • 2光盘启动:

    在BIOS引导的情况下,通常使用安装光盘维护、安装操作性系统,只要找到CD-ROM(或者DVD,主板菜单不一样)使用上下键,选中此项,按回车键(Enter)予以确认,再按F10键,重启电脑,就能认光驱里面的安装光盘了。

    UEFI引导系统
  • 3u盘启动:

    在BIOS引导下,u盘PE系统的操作和光驱引导设置方法类似,只要找到USB选项(有的主板是显示u盘的型号),设置为第一启动项即可,重启电脑就会读u盘PE系统了。

    UEFI引导系统
  • 4【UEFI引导】:

    UEFI引导的流程是开机初始化UEFI,然后,直接引导操作系统,进入系统。和传统的BIOS引导相比,UEFI引导少了一道BIOS自检的过程,所以开机就会更快一些,这也成为了电脑的新宠。

    UEFI引导系统
  • 5比如,宏碁的新电脑默认是UEFI引导,如果需要使用u盘PE系统,有可能会不认u盘,此时一点要做些调整。插好已经制作过的u盘PE系统,开机快速(慢了就会直接读取硬盘)按F12键,进入设置界面。下图是各品牌机或者主板进入BIOS启动设置的键值(仅供参考)。
    UEFI引导系统
  • 6进入设置主界面,继续点击Boot(引导)菜单,Boot mode(引导模式)右边的就是当前的引导模式,这里默认的是UEFI引导。
    UEFI引导系统
  • 7选中该项回车,打开设置项之后,这里有两个选项:UEFI Legacy(使用上下键选中),引导模式选择Legacy
    UEFI引导系统
  • 8点击OK,确认修改为Legacy引导模式。
    UEFI引导系统
  • 9提示询问是否现在重启电脑?点击 yes 即可重启电脑,就会显示启动菜单。
    UEFI引导系统
  •  进入这个界面,相信很多熟悉BIOS的老用户就如释重负,第一项(HDD)为硬盘启动,第二项(USB)为u盘启动,第三项(Network)为网络启动。
    UEFI引导系统
  •  设置好u盘启动(USB),重启电脑就会认已经制作好的u盘PE系统了。接下来都是图形界面,又有中文说明,不必赘述了吧。
    UEFI引导系统
  •  但是,因为早期出品的 Windows 7 操作系统还没见过UEFI引导系统,所以它并不支持GPT Boot on UEFI引导(适合Windows 8 操作系统),并且MBR与GPT之间是不兼容的,如果要安装Windows7操作系统,必须将磁盘转换为MBR引导。现在u盘PE系统中都有这类维护工具。
    UEFI引导系统
  •  注意:转换磁盘格式需要清空磁盘中的所有分区和数据,所以在操作之前,一定要保存好磁盘中所有重要数据。进入Windows7安装界面之后,也可以不使用维护工具,使用命令转换磁盘引导模式:按Shift + F10组合键,打开命令提示符。输入:Diskpart 按回车,进入操作界面。
    UEFI引导系统
  •  输入:list disk 按回车执行命令,可以查看磁盘的相关信息。此时要注意查看磁盘的容量大小来判断、选择。下图中465GB的Disk 0是硬盘,3852MB的Disk 1是用于Win7安装的U盘(别选错了!)。
    UEFI引导系统
  •  输入:select disk 0 按回车,选择disk 0为当前操作的磁盘。
    UEFI引导系统
  •  输入:Clean 按回车,清空当前磁盘分区。
    UEFI引导系统
  •  输入:convert mbr 按回车,将硬盘引导模式转换为MBR分区。至此,安装Windows7操作系统就没有引导异常的问题了。
    UEFI引导系统
  •  操作完成之后,关闭此命令提示符窗口,继续按照正常的方法安装Windows 7系统即可。

    扩展知识:convert命令的其它用法:

    convert gpt 将磁盘从MBR转换为GPT。

    convert basic 将磁盘从动态转换为基本。

    convert dynamic 将磁盘从基本转换为动态。

    UEFI引导系统
  •  最后要注意:装好Windows7操作系统,此时的硬盘引导模式已经转成legacy模式,可能无法正常启动系统,如要正常启动,还需要将引导模式变回UEFI
    UEFI引导系统

    在真正学习如何安装UEFI之前,我们还是先来了解一下UEFI的优势。作为传统BIOS(Basic Input/Output System)的继任者,UEFI拥有前辈所不具备的诸多功能,比如图形化界面、多种多样的操作方式、允许植入硬件驱动等等。这些特性让UEFI相比于传统BIOS更加易用、更加多功能、更加方便。而Windows 8在发布之初就对外宣布全面支持UEFI,这也促使了众多主板厂商纷纷转投UEFI,并将此作为主板的标准配置之一。

    UEFI
    UEFI启动流程

    UEFI抛去了传统BIOS需要长时间自检的问题,让硬件初始化以及引导系统变得简洁快速。换种方式说,UEFI已经把电脑的BIOS变得不像是BIOS,而是一个小型固化在主板上的操作系统一样,加上UEFI本身的开发语言已经从汇编转变成C语言,高级语言的加入让厂商深度开发UEFI变为可能。

    UEFI
    目前UEFI是主板的标准配置

    目前我们能够看到的主流主板产品或者是笔记本、品牌机等产品均支持UEFI,而Surface Pro其实也是将UEFI实用化的最好范例。大家能够看到的最直观的的现象,就是现在的BIOS都可以用鼠标进行操作,并且可以内置声音、图像、动画、甚至是一个小应用软件。所以单从硬件上来看,我们的硬件对于UEFI的支持早就已经做好的准备。不信?你可以看看你最近购买的主板的包装盒上,肯定会标有支持UEFI的Logo,这就是主板支持UEFI的有力证明。

    UEFI
    大部分用户使用UEFI还仍然停留在界面上

    说到这里,或许会有人提问:“既然现在主板已经使用了UEFI,那么我们现在就可以直接享受UEFI的诸多功能么?”很遗憾,目前我们接触UEFI更多地只是停留在BIOS界面上,在系统的启动运行机制上,我们并没有真正使用到UEFI的特性。想要真的让操作系统与UEFI无缝连接,就需要我们自己进行一些操作,这也是本文笔者要向大家介绍的最重要的内容。

     

    那么要在UEFI平台上安装Windows 8到底需要什么东西呢?笔者首先和大家一块儿来整理一下:

    1.一个有足够容量的U盘,推荐8GB以上

    2.Windows 8 x64官方原版镜像(MSDN可以下载)

    3.一台支持UEFI的主机

    U盘笔者相信大家都有,只要容量足够即可。目前微软在国内不发售实体版Windows 8,所以我们想要制作系统启动盘,就必须通过MSDN下载原版镜像然后制作一个系统安装U盘。关于Win 8镜像,我们在网络上可以找到各种各样的版本。但是为了能够顺利安装成功,且在今后的使用中不会出现异常情况,笔者强烈建议大家在微软MSDN上下载原版镜像。如果你的镜像是通过其他渠道下载的,笔者也希望你能够在制作系统安装U盘之前对镜像进行校验并于MSDN上提供SHA1检验码进行比对,这样才能够保证镜像的安全以及完整。

    UEFI
    强烈建议登录MSDN下载官方原版Windows 8镜像

    UEFI
    用UltraISO制作镜像非常方便

    与其他的系统安装U盘的制作方法相同,我们使用专用的制作软件就可以将镜像写入到U盘当中。这里笔者选择的是较为知名的UltraISO。我们先将U盘插在电脑主机上,然后使用UltraISO打开需要写入的Windows 8安装镜像,然后在软件菜单中选择“启动->选择硬盘映像”,接着在对话框中选择需要写入的U盘盘符后点击“写入”静候U盘写入完毕即可。

    UEFI
    系统安装U盘中efi文件夹是关键

    U盘写入完毕后,我们可以打开U盘,检查一下里面是否有“efi”文件夹,这也是我们是否能够通过UEFI进行引导安装的关键。另外还有一个需要注意的事项,那就是在系统安装U盘不要插在USB3.0接口上,否则会出现不可预知的错误。

     

    我们的电脑想要快速开机,需要具备三个条件:第一是主板支持UEFI,二是系统支持UEFI(Win8),最后就硬盘需要采用GPT分区。

    GPT分区全名为Globally Unique Identifier Partition Table Format,即全局唯一标示磁盘分区表格式。GPT还有另一个名字叫做GUID分区表格式,我们在许多磁盘管理软件中能够看到这个名字。而GPT也是UEFI所使用的磁盘分区格式。

    UEFI
    GUID分区表模型

    GPT分区的一大优势就是针对不同的数据建立不同的分区,同时为不同的分区创建不同的权限。就如其名字一样,GPT能够保证磁盘分区的GUID唯一性,所以GPT不允许将整个硬盘进行复制,从而保证了磁盘内数据的安全性。想比与我们目前通常使用的MBR分区,GPT本身就有着得天独厚的优势。

    UEFI
    使用磁盘工具可以很方便地将磁盘转换成GPT(GUID)格式

    GPT分区的创建或者更改其实并不麻烦,但是一块硬盘如果想从MBR分区转换成GPT分区的话,就会丢失硬盘内的所有数据。所以我们在更改硬盘分区格式之前需要先将硬盘备份,然后使用Windows自带的磁盘管理功能或者使用DiskGenius等磁盘管理软件就可以轻松地将硬盘转换成GPT(GUID)格式,转换完成后,我们就可以真正开始系统的安装过程了。

     

    U盘制作好了,硬盘分区也转换完成了,剩下的就是要搭建好平台,然后将U盘插好,开机进入BIOS选项。在BIOS选项中,我们找到启动项选择菜单,并且将第一启动项选择为U盘,但是大家注意,这里我们选择的时候就要选择UEFI启动方式启动U盘了,这样UEFI对自动寻找U盘里的EFI引导文件,然后使用UEFI正确的引导方式来进行安装。

    UEFI
    在BIOS中使用UEFI方式引导系统安装U盘

    其实UEFI引导安装Windows 8和我们用老方式安装没有什么太大的区别,我们仍然是通过安装向导来一步一步安装操作系统。唯一与传统方式安装操作系统方式不同的是,UEFI安装在磁盘分区的时候会有所变化。除了主分区,我们还可以看到恢复分区、系统分区以及MSR分区,这也是GPT分区格式的标志。对于这三个分区的做用笔者不多做介绍。因为在系统安装完成后,这三个分区是会被隐藏起来的,我们一般情况下也是不会接触到这三种分区的。

    UEFI
    UEFI在开机Logo的地方就开始加载系统

    剩下的工作就是交给安装程序自己处理,我们可以静候系统安装完成,安装完成后,系统将会进行驱动匹配以及初始化等很多工作。最后就可以直接进入到Windows 8的主界面了。如果你是一直在电脑前面盯着显示器的话,就可以发现一些变化了。通常在系统启动的时候,都是在显示完主板自检时的Logo之后才会引导系统。而采用UEFI方式之后,我们可以发现系统在开机Logo画面上就开始加载,并且加载速度非常快速。相比传统的引导系统方法,UEFI在效率上的确要高不少。

    UEFI
    系统中可以看到多个隐藏分区

    进入系统后,我们可以进入到磁盘管理,来仔细观察一下磁盘的分区结构结构。当然,除了三个隐藏分区,剩下的硬盘容量,我们还是可以向普通分区一样进行管理。目前主流的64位Windows操作系统都对GPT分区有良好的支持,所以我们也不用担心安装多系统的问题。

     

    系统安装完成了,我们就来真正测试一下UEFI下系统的启动有多快吧。其实在系统刚安装完自动进行系统配置的时候,我们就应该能够观察到系统的启动速度已经变得非常快速了。下面笔者就用很直接的方法来记录UEFI下Windows 8启动到底有多快。

    记录方法很简单,笔者先将主机打开并进入系统,然后将系统重启并准备计时。在系统重新启动出现开机Logo的时候开始记录时间,直到最后进入到桌面停止计时。这期间的时间就记录为开机时间。那么我们马上来看看这套平台的实测成绩吧。

    UEFI
    UEFI+GPT系统开机速度

    从最终记录的时间来看,UEFI+GPT的威力的确不容小觑,整个开机速度相比于传统启动方式有了质的提升。

    这样的开机速度虽然快,但是我们可能对这样的开机速度没有一个非常明确的概念。那么笔者再请出另一位主角Surface Pro来做一个开机速度的对比。

    UEFI
    Surface Pro启动速度

    经过测试,Surface Pro的开机速度要达到11秒,这样的成绩虽然也算是非常出色,但是仍然和我们的测试平台有着一定的差距。抛开硬件之间的性能差距不谈,就UEFI与GPT的组合来看,的确会为我们平台的开机速度的提升做出了不小的贡献。

spring多数据源一致性事务方法

在一个业务中,在一个事务中处理时候将切换多个数据源,需要保证同一事务多个数据源数据的一致性。下面是spring中的两种配置方法。
本文内容从网络收集整理而成
 spring 多数据源配置

spring 多数据源配置一般有两种方案:

1、在spring项目启动的时候直接配置两个不同的数据源,不同的sessionFactory。在dao 层根据不同业务自行选择使用哪个数据源的session来操作。

2、配置多个不同的数据源,使用一个sessionFactory,在业务逻辑使用的时候自动切换到不同的数据源,有一个种是在拦截器里面根据不同的业务现切换到不同的datasource;有的会在业务层根据业务来自动切换。但这种方案在多线程并发的时候会出现一些问题,需要使用threadlocal等技术来实现多线程竞争切换数据源的问题。

 

【本文暂时只讨论第一种方案】

spring多事务配置主要体现在db配置这块,配置不同的数据源和不同的session

1、一下贴出 spring-db.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="

http://www.springframework.org/schema/beans


http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">

    <bean id="test1DataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" >
        <property name="driverClassName" value="${database.test1.driverClassName}" />
        <property name="url" value="${database.test1.url}" />
        <property name="username" value="${database.test1.username}" />
        <property name="password" value="${database.test1.password}" />
    </bean>
    
    <bean id="test2DataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" >
        <property name="driverClassName" value="${database.test2.driverClassName}" />
        <property name="url" value="${database.test2.url}" />
        <property name="username" value="${database.test2.username}" />
        <property name="password" value="${database.test2.password}" />
    </bean>
    
    <bean id="test1SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test1DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <bean id="test2SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test2DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <bean id="test1TxManager"   class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
        <property name="dataSource" ref="test1DataSource"></property>  
    </bean>  
    <bean id="test2TxManager"   class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
        <property name="dataSource" ref="test2DataSource"></property>  
    </bean>  
  
    <tx:annotation-driven transaction-manager="test2TxManager" />
    <tx:annotation-driven transaction-manager="test1TxManager" />
    
</beans>

2、dao层做了一个小的封装,将不同的SqlSessionFactory 注入到 SessionFactory,通过BaseDao来做简单的封装,封装不同库的基本增删改。dao实现层都集成于Basedao 这样的话,实现可以根据自己需要来选择不同的库来操作不同的内容。

session工厂

package com.neo.dao;

import com.neo.entity.Entity;

public class BaseDao extends SessionFactory{
    
    public void test1Update(Entity entity) {
    this.getTest1Session().update(entity.getClass().getSimpleName()+".update", entity);
    }
    
    public void test2Update(Entity entity) {
       this.getTest2Session().update(entity.getClass().getSimpleName()+".update", entity);
       }
}

BaseDao

package com.neo.dao;

import com.neo.entity.Entity;

public class BaseDao extends SessionFactory{
    
    public void test1Update(Entity entity) {
    this.getTest1Session().update(entity.getClass().getSimpleName()+".update", entity);
    }
    
    public void test2Update(Entity entity) {
       this.getTest2Session().update(entity.getClass().getSimpleName()+".update", entity);
       }
}

 

代码地址:https://github.com/ityouknow/spring-mybatis-mulidatasource/tree/master/spring-mybatis

 

以上的配置在多数据源连接,正常的增删改都是没有问题的,但是遇到分布式的事务是就出问题:

测试代码:

package com.neo.service.impl;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.neo.dao.UserDao;
import com.neo.dao.UserInformationsDao;
import com.neo.entity.UserEntity;
import com.neo.entity.UserInformationsEntity;
import com.neo.service.UserService;

@Service
public class UserServiceImpl implements UserService {
    
    @Resource UserDao userDao;
    @Resource UserInformationsDao userInformationsDao;

    @Override
    @Transactional
    public void updateUserinfo() {
    
    UserEntity user=new UserEntity();
    user.setId(1);
    user.setUserName("李四4");
    
    UserInformationsEntity userInfo=new UserInformationsEntity();
    userInfo.setUserId(1);
    userInfo.setAddress("陕西4");
    
    userDao.updateUser(user);
    userInformationsDao.updateUserInformations(userInfo);
    
    if(true){
        throw new RuntimeException("test tx ");
    }
    }
}

在service添加事务后,更新完毕抛出异常,test2更新进行了回滚,test1 数据更新没有回滚。

解决方案添加分布式的事务,Atomikos和spring结合来处理。

Atomikos多数据源的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="

http://www.springframework.org/schema/beans


http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">

     <bean id="test1DataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="test1"/>
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
        <property name="xaProperties">
            <props>
                <prop key="url">${database.test1.url}</prop>
                <prop key="user">${database.test1.username}</prop>
                <prop key="password">${database.test1.password}</prop>
            </props>
        </property>
        <property name="minPoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="borrowConnectionTimeout" value="30" />
        <property name="testQuery" value="select 1" />
        <property name="maintenanceInterval" value="60" />
    </bean>
    
     <bean id="test2DataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="test2"/>
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
        <property name="xaProperties">
            <props>
                <prop key="url">${database.test2.url}</prop>
                <prop key="user">${database.test2.username}</prop>
                <prop key="password">${database.test2.password}</prop>
            </props>
        </property>
        <property name="minPoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="borrowConnectionTimeout" value="30" />
        <property name="testQuery" value="select 1" />
        <property name="maintenanceInterval" value="60" />
    </bean>
    
    <bean id="test1SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test1DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <bean id="test2SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test2DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <!-- 分布式事务 -->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
        <property name="forceShutdown" value="true"/>
    </bean>
    
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300"/>
    </bean>
    
    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"/>
        <property name="userTransaction" ref="atomikosUserTransaction"/>
    </bean>
    
    <tx:annotation-driven/>
    
</beans>

添加完分布式的项目地址:

https://github.com/ityouknow/spring-mybatis-mulidatasource/tree/master/spring-mybatis-atomikos

上述资料来源: http://www.cnblogs.com/ityouknow/p/4977136.html

 

其中, 文章中的: atomikos 是一个开源的java分布式xa事物项目, 百度百科介绍如下:

Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。
Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器,以下是包括在这个开源版本中的一些功能:
l 全面崩溃 / 重启恢复
l 兼容标准的SUN公司JTA API
l 嵌套事务
l 为XA和非XA提供内置的JDBC适配器
注释:XA:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。
以下的函数使事务管理器可以对资源管理器进行的操作:
1)xa_open,xa_close:建立和关闭与资源管理器的连接。
2)xa_start,xa_end:开始和结束一个本地事务。
3)xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。
4)xa_recover:回滚一个已进行预提交的事务。
5)ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。
6)ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。
l 内置的JMS适配器XA-capable JMS队列连接器
注释:JMS:jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
l 通过XA API兼容第三方适配器
l 更好的整合您的项目
l 集成Hibernate
如何使用Atomikos TransactionsEssentials
Atomikos TransactionsEssentials 是一个可靠的库,可以加入到您的Java应用程序,也就是说为了使用这个产品,您必须添加一些jar文件(包括在dist和lib文件夹下)到您的应用程序或者应用程序服务器。
请注意:Atomikos TransactionsEssentials是一个非常快速的嵌入式事务管理器,这就意味着,您不需要另外启动一个单独的事务管理器进程(不要查找任何的bin文件夹)。相反,您的应用服务器将有它自己的intra-VM事务管理器。
配置需求:至少Java1.5 jdk,并且最少128M的内存
性能优化:尽管这个软件有着很大的优势,但是想要更好的发挥其作用,可以按以下的方法优化:
l 更高的内存,意味着更高的吞吐量(每秒的事务数目)
l 使连接池尽可能的大
l 一旦你不需要的连接请马上关闭它们。不要把你的应用程序放在缓存里,让内部连接池为你做这些,这将促使更高效的连接使用
l 不要让活动的事务闲置:终止所有情况下的事务,尤其是在异常报错情况下的事务。这将减少数据库的锁定时间,并且最大效率的处理启用的使用。
如果想获取这些细节的更多信息,也要参阅文档说明部分。
值得注意的是,在我们所有的压力测试中,Atomikos TransactionsEssentials比J2EE的web容器更高效的吞吐量。这些测量值包括日志记录的高效的事务状态,同样,在我们所有的测量中,包括XA和non-XA,高效的效率是一样的。
在J2SE中使用Atomikos Transactions Essentials,只需要按以下步骤
  1. 将idst和lib中的jar包全部放入的项目中
  2. 创建或者自定义你应用的transactions.properties(或者jta.properties)文件(事务管理器的配置),然后将它放入到classpath中,安装文件夹中包涵一个实例文件;在properties文件中注释(#)后面的是默认值,取消一行并且改变默认值。

相关地址:

https://www.atomikos.com/Main/SupportOverview

下面是, 另外的 相关配置, 可以参考一下

注释:XA:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。

atomikos配合spring的使用方法:1、依赖包
Atomikos的:
transactions-jdbc
transactions-jta
transactions-api
transactions
atomikos-utils
还有一个不要忘了,是jta的包。
用maven要简单一点,只需要加入两个依赖:

Xml代码
  1. <dependency>
  2.     <groupId>com.atomikos</groupId>
  3.     <artifactId>transactions-jdbc</artifactId>
  4.     <version>3.7.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>javax.transaction</groupId>
  8.     <artifactId>jta</artifactId>
  9.     <version>1.1</version>
  10. </dependency>

2、配置数据源
这一步是比较重要的。要用AtomikosDataSourceBean,而不是以前用的连接池如dbcp。最好也用XA(这东西我还不太懂),注意jdbc的链接地址和登陆账号与普通连接池的配置的格式不一样。下面是一个mysql数据库的配置举例:

Xml代码
  1. <bean id="dataSource1" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
  2.     <property name="uniqueResourceName" value="ds1"/>
  3.     <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
  4.     <property name="xaProperties">
  5.         <props>
  6.             <prop key="url">jdbc:mysql://localhost/test</prop>
  7.             <prop key="user">test</prop>
  8.             <prop key="password">test</prop>
  9.         </props>
  10.     </property>
  11.     <property name="minPoolSize" value="10" />
  12.     <property name="maxPoolSize" value="100" />
  13.     <property name="borrowConnectionTimeout" value="30" />
  14.     <property name="testQuery" value="select 1" />
  15.     <property name="maintenanceInterval" value="60" />
  16. </bean>

再来一个sybase的配置举例:

  1. <bean id="dataSource2" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
  2.     <property name="uniqueResourceName" value="ds2"/>
  3.     <property name="xaDataSourceClassName" value="com.sybase.jdbc3.jdbc.SybXADataSource"/>
  4.     <property name="xaProperties">
  5.         <props>
  6.             <prop key="serverName">192.168.1.10</prop>
  7.                         <prop key="portNumber">2638</prop>
  8.                         <prop key="databaseName">test</prop>
  9.             <prop key="user">test</prop>
  10.             <prop key="password">test</prop>
  11.         </props>
  12.     </property>
  13.     <property name="minPoolSize" value="10" />
  14.     <property name="maxPoolSize" value="100" />
  15.     <property name="borrowConnectionTimeout" value="30" />
  16.     <property name="testQuery" value="select 1" />
  17.     <property name="maintenanceInterval" value="60" />
  18. </bean>

3、使用数据源
这一步与平时好像没什么不一样。我做例子的时候是用mybatis,配置如下:

Xml代码
  1. <bean id="sqlSessionFactory1" class="org.mybatis.spring.SqlSessionFactoryBean">
  2.     <property name="dataSource" ref="dataSource1"/>
  3. </bean>
  4. <bean id="sqlSessionFactory2" class="org.mybatis.spring.SqlSessionFactoryBean">
  5.     <property name="dataSource" ref="dataSource2"/>
  6. </bean>

当然,mybatis还要配置一下映射文件的自动扫描,这里与atomikos无关:

  1. <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  2.     <property name="basePackage" value="xx.xx;" />
  3.     <property name="sqlSessionFactory" ref="sqlSessionFactory1"/>
  4. </bean>
  5. <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  6.     <property name="basePackage" value="yy.yy;" />
  7.     <property name="sqlSessionFactory" ref="sqlSessionFactory2"/>
  8. </bean>

用spring JdbcTemplate应该与普通使用没什么不同,用hibernate可能会有点不一样,没测试过。

4、配置jta事务管理
这是很关键的一步。原理我不太懂,例子如下:

Xml代码
  1. <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
  2.     <property name="transactionManager">
  3.         <bean class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
  4.             <property name="forceShutdown" value="true"/>
  5.         </bean>
  6.     </property>
  7.     <property name="userTransaction">
  8.         <bean class="com.atomikos.icatch.jta.UserTransactionImp"/>
  9.     </property>
  10. </bean>

当然,用spring的声明式事务配置,再加上一行:

  1. <tx:annotation-driven/>

(注意,本来要配置transaction-manager属性,如:<tx:annotation-driven transaction-manager="transactionManager"/>。这里没有配置是因为它的默认值是transactionManager)

5、atomikos的配置文件jta.properties
这个文件一般放在根路径吧,与log4j.properties类似。jta.properties也可命名为transactions.properties。如果不配置这个文件,项目也能启动,因为几乎所有配置项都有默认值。最好还是配置了,详细配置信息请查看:http://www.atomikos.com/Documentation/JtaProperties。

6、不管是用JdbcTemplate、mybatis还是hibernate,应该都可以写代码来测试了。。。 

 

另外, 常用xa事物项目信息

本地事务和分布式事务。

本地事务:只处理单一数据源,比如单个数据库下,事务进行控制。
分布式事务:处理多种异构的数据源, 比如某个业务操作中同时包含JDBC和JMS或者某个操作需要访问多个不同的数据库,在不同数据库之间进行事务控制。

在Java中,分布式事务主要的规范是JTA/XA。其中:JTA是Java的事务管理器规范, XA是工业标准的X/Open CAE规范,可被两阶段提交及回滚的事务资源定义。比如某数据库实现了XA规范,则不管是JTA,还是MSDTC,都可以基于同样的行为对该数据库进行事务处理。

JTA全称为Java Transaction API,顾名思义JTA定义了一组统一的事务编程的接口,这些接口如下:
XAResource :XAResource接口是对实现了X/Open CAE规范的资源管理器 (Resource Manager,数据库就是典型的资源管理器) 的抽象,它由资源适配器 (Resource Apdater) 提供实现。XAResource是支持事务控制的核心。
Transaction:Transaction接口是一个事务实例的抽象,通过它可以控制事务内多个资源的提交或者回滚。二阶段提交过程也是由Transaction接口的实现者来完成的。
TransactionManager:托管模式 (managed mode) 下,TransactionManager接口是被应用服务器调用,以控制事务的边界的。
UserTransaction:非托管模式 (non-managed mode) 下,应用程序可以通过UserTransaction接口控制事务的边界

在tomcat下是没有分布式事务的,可以借助于第三方Jotm和Automikos实现,在spring中分布式事务是通过jta(jotm,atomikos)来进行实现 。即:通过代码的方式来决定是否是分布式事务。

注:推荐用服务器自己的数据源(也就是 lookup JNDI),这样的话,是不是 XA 事务就由服务器的配置来定制,代码就不需要任何配置来决定是不是 XA 了 。事务本身是不是 XA (分布式的)是服务器的事,服务器来管理“资源” (包括数据源,JMS 连接等,一个资源(JDBC连接)如何参与事务是“资源管理器”(驱动程序)的职责,跟程序无关),服务器提供事务管理并作为“事务协调者”来处理多个“资源管理器”(不同的数据库连接)之间的事务一致性。

jotm和automikos网址:
1、http://jotm.objectweb.org/
2、http://www.atomikos.com/Main/TransactionsEssentials

JTA的实现框架有:
GeronimoTM/Jencks  官方文档比较少,不适合学习和维护。
SimpleJTA 没有实现JTS (Java Transaction Service)而且不是活跃的。
Atomikos  是一个另人钦佩的产品。有丰富的文档,而且有很好的支持。
JBossTS  是一个应用在JBOSS服务器上的,肯定是一个成熟的产品,也有好的支持,详细信息可以看这里:http://www.theserverside.com/news  /thread.tss?thread_id=37941

最常见的二个如下:
JOTM
JOTM(Java Open Transaction Manager)是ObjectWeb的一个开源JTA实现,它本身也是开源应用程序服务器JOnAS(Java Open Application Server)的一部分,为其提供JTA分布式事务的功能。
存在的问题:使用中不能自动rollback,无论什么情况都commit。注:spring3开始已经不再支持jotm

Atomikos
大家推荐最多的。和JOTM相比Atomikos Transactions Essentials更加稳定,它原来是商业项目,现在开源了。象MySQL一样卖服务支持的。而且论坛页比较活跃,有问题很快可以解决。

分布式开放消息系统(RocketMQ)的原理与实践

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  1. 消息的顺序问题
  2. 消息的重复问题

RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?

关键特性以及其实现原理

一、顺序消息

消息有序指的是可以按照消息的发送顺序来消费。例如:一笔订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照顺序依次消费才有意义。与此同时多笔订单之间又是可以并行消费的。首先来看如下示例:

假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

你可能会采用这种方式保证消息顺序

假定M1发送到S1,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端被消费后,通知S2,然后S2再将M2发送到消费端。

这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到MQ集群,也不能保证M1被先消费。换个角度看,如果M2先于M1达到MQ集群,甚至M2被消费后,M1才达到消费端,这时消息也就乱序了,说明以上模型是不能保证消息的顺序的。如何才能在MQ集群保证消息的顺序?一种简单的方式就是将M1、M2发送到同一个Server上:

保证消息顺序,你改进后的方法

这样可以保证M1先于M2到达MQServer(生产者等待M1发送成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

这个模型也仅仅是理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:

网络延迟问题

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就仍将被先消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费的情况。

那如何解决这个问题?将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2。

聪明的你可能已经想到另外的问题:如果M1被发送到消费端后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。

保证消息顺序的正确姿势

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费M1且已经发送响应消息,只是MQ Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就引入了我们要说的第二个问题,消息重复问题,这个后文会详细讲解。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简单处理。总结起来,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

这样的设计虽然简单易行,但也会存在一些很严重的问题,比如:

  1. 并行度就会成为消息系统的瓶颈(吞吐量不够)
  2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  1. 不关注乱序的应用实际大量存在
  2. 队列无序并不意味着消息无序

所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?

最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。

RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。

private SendResult send()  {
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根据我们的算法,选择一个发送队列
        // 这里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}
二、消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

三、事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。我们以一个转帐的场景为例来说明这个问题:Bob向Smith转账100块。

在单机环境下,执行事务的情况,大概是下面这个样子:

单机环境下转账事务示意图

当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:

集群环境下转账事务示意图

这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。那如何来规避这个问题?

大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务:

小事务+异步消息

图中执行本地事务(Bob账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

首先看下先发送消息的情况,大致的示意图如下:

事务消息:先发送消息

存在的问题是:如果消息发送成功,但是扣款失败,消费端就会消费此消息,进而向Smith账户加钱。

先发消息不行,那就先扣款吧,大致的示意图如下:

事务消息-先扣款

存在的问题跟上面类似:如果扣款成功,发送消息失败,就会出现Bob扣钱了,但是Smith账户未加钱。

可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。

这里需要说明一下:如果使用Spring来管理事物的话,大可以将发送消息的逻辑放到本地事物中去,发送消息失败抛出异常,Spring捕捉到异常后就会回滚此事物,以此来保证本地事物与发送消息的原子性。

RocketMQ支持事务消息,下面来看看RocketMQ是怎样来实现的。

RocketMQ实现发送事务消息

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那我们来看下RocketMQ源码,是如何处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// =============================发送事务消息的一系列准备工作========================================
// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

//  ================================事务消息的发送过程=============================================
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法会将请求发往broker(mq server)去更新事务消息的最终状态:

  1. 根据sendResult找到Prepared消息sendResult包含事务消息的ID
  2. 根据localTransaction更新消息的最终状态

如果endTransaction方法执行失败,数据没有发送到broker,导致事务消息的 状态更新失败,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOnewaybroker来更新消息的最终状态。

再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题,解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。

消费事务消息

这样基本上可以解决消费端超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。这也是RocketMQ目前暂时没有解决这个问题的原因,在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

20160321补充:在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息,具体情况请参考rocketmq的issues:

https://github.com/alibaba/RocketMQ/issues/65

https://github.com/alibaba/RocketMQ/issues/138

https://github.com/alibaba/RocketMQ/issues/156

四、Producer如何发送消息

Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:

producer发送消息负载均衡

首先分析一下RocketMQ的客户端发送消息的源码:

// 构造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整个应用生命周期内,只需要初始化1次
producer.start();
// 构造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:给消息打标签,用于区分一类消息,可为null
                        "OrderID188",// key:自定义Key,可以用于去重,可为null
                        ("Hello MetaQ").getBytes());// body:消息内容
// 发送消息并返回结果
SendResult sendResult = producer.send(msg);
// 清理资源,关闭网络连接,注销自己
producer.shutdown();

在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

  1. 如果没有指定namesrv地址,将会自动寻址
  2. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...
  3. 启动负载均衡的服务

初始化完成后,开始发送消息,发送消息的主要代码如下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 检查Producer的状态是否是RUNNING
    this.makeSureStateOK();
    // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 从路由信息中选择一个消息队列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 将消息发送到该队列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代码中需要关注的两个方法tryToFindTopicPublishInfoselectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。

如果Producer发送消息失败,会自动重试,重试的策略:

  1. 重试次数 < retryTimesWhenSendFailed(可配置)
  2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  3. 同时满足上面两个条件后,Producer会选择另外一个队列发送消息
五、消息存储

RocketMQ的消息存储是由consume queuecommit log配合完成的。

1、Consume Queue

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

我们可以在配置中指定consumequeuecommitlog存储的目录
每个topic下的每个queue都有一个对应的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件组织,如图所示:

Consume Queue文件组织示意图
  1. 根据topicqueueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。
  2. 按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA
  3. 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA

死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:

consumequeue文件存储单元格式
  1. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
  2. Size存储中消息的大小
  3. Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
2、Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。
文件的默认位置如下,仍然可通过配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。

Commit Log存储单元结构图
3、消息存储实现

消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析(目前正在收集素材),这小节只以代码说明一下具体的流程。

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 将Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分发消息位置到ConsumeQueue
    // 2.分发到IndexService建立索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
4、消息的索引文件

如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:

消息索引

索引文件主要用于根据key来查询消息的,流程主要是:

  1. 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
  2. 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
六、消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

首先看下消费端的负载均衡:

消费端负载均衡

消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载:

  1. 遍历Consumer下的所有topic,然后根据topic订阅所有的消息
  2. 获取同一topic和Consumer Group下的所有Consumer
  3. 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

如同上图所示:如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。

消费端的Push模式是通过长轮询的模式来实现的,就如同下图:

Push模式示意图

Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

七、RocketMQ的其他特性

前面的6个特性都是基本上都是点到为止,想要深入了解,还需要大家多多查看源码,多多在实际中运用。当然除了已经提到的特性外,RocketMQ还支持:

  1. 定时消息
  2. 消息的刷盘策略
  3. 主动同步策略:同步双写、异步复制
  4. 海量消息堆积能力
  5. 高效通信
  6. .......

其中涉及到的很多设计思路和解决方法都值得我们深入研究:

  1. 消息的存储设计:既要满足海量消息的堆积能力,又要满足极快的查询效率,还要保证写入的效率。
  2. 高效的通信组件设计:高吞吐量,毫秒级的消息投递能力都离不开高效的通信。
  3. .......

RocketMQ最佳实践

一、Producer最佳实践

1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
2、每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
4、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
5、某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

二、Consumer最佳实践

1、消费过程要做到幂等(即消费端去重)
2、尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
3、优化每条消息消费过程

三、其他配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

RocketMQ设计相关

RocketMQ的设计假定:

每台PC机器都可能宕机不可服务
任意集群都有可能处理能力不足
最坏的情况一定会发生
内网环境需要低延迟来提供最佳用户体验

RocketMQ的关键设计:

分布式集群化
强数据安全
海量数据堆积
毫秒级投递延迟(推拉模式)

这是RocketMQ在设计时的假定前提以及需要到达的效果。我想这些假定适用于所有的系统设计。随着我们系统的服务的增多,每位开发者都要注意自己的程序是否存在单点故障,如果挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、自己管理的数据是否足够安全...... 多多规范自己的设计,才能开发出高效健壮的程序。

来源: http://www.jianshu.com/p/453c6e7ff81c

 

Java常见内存溢出、内存泄露问题分析

  1. Java垃圾回收机制(GC)1.1GC机製作用1.2堆内存3代分布(年轻代、老年代、持久代)1.3GC分类1.4GC过程
  2. Java应用内存问题分析2.1Java内存划分2.2Java常见内存问题2.3ML(内存泄露) OOM(内存溢出)问题现象及分析2.4IBM DUMP分析工具使用介绍
  3. Java应用CPU、线程问题分析

Java垃圾回收机制(GC)

1.GC机製作用

1.1 JVM自动检测和释放不再使用的对象内存

1.2 Java 运行时JVM会执行 GC,不再需要显式释放对象

例:Object.finallize、 Windows.dispose、 System.gc

0ACTWd00

2.Java堆3代分布
0ACTWd01

关于Java堆3代分布情况,可通过命令:jmap –heap pid 查看

0ACTWd02

3.GC分类

3.1 Young GC(Minor GC):收集生命周期短的区域(Young)

(1) 清空Eden+from survivor中所有no ref的对象占用的内存

(2) 将Eden+from survivor中所有存活的对象copy到to survivor中


(3) 一些对象将晋升到old中: to survivor放不下的或存活次数超过turning threshold中的

3.2 Full GC(Major GC):收集生命周期短的区域(Young)和生命周期比较长的区域(Old),对整个堆进行垃圾收集,有时也会回收持久区(Perm)

(1) 清空heap中no ref的对象

(2) 清空permgen中已经被卸载的class信息4.GC过程

(1) 新生成的对象在Eden区完成内存分配

(2) 当Eden区满,再创建对象,会因为申请不到空间触发YGC,进行young(eden+1survivor)区的垃圾回收(为什麽是eden+1survivor:两个survivor中始终有一个survivor是空的,空的那个被标记成To Survivor)

(3) YGC时,Eden不能被回收的对象被放入到空的survivor(也就是放到To Survivor,此时Eden被清空),另一个survivor(From Survivor)里不能被GC回收的对象也会被放入To Survivor,始终保证一个survivor是空的(YGC完成之后,To Survivor 和 From Survivor的标记互换)


(4) YGC结束后,若存放对象的survivor满,则这些对象被copy到old区,或者survivor区没有满,但是有些对象已经足够Old(超过XX:MaxTenuringThreshold),也被放入Old区

(5) 当Old区被放满的之后,进行完整的垃圾回收,即 FGC

(6) FGC后,若Survivor及old区仍然无法存放从Eden複製过来的部分对象,导致JVM无法在Eden区为新对象创建内存区域,则出现OOM错误

0ACTWd03

Java应用内存问题分析方法

1.Java内存划分

可粗略划分三类:

1.1 堆内存

存放由 new 创建的对象和数组,在堆中分配的内存,由 Java 虚拟机的自动垃圾回收器来管理

0ACTWd88

1.2 栈内存

在函数中定义的一些基本类型的变量和对象的引用变量都是在函数的栈内存中分配(更准确地说是保存了引用的堆内存空间的地址,java中的「指针」)

1.3 永久保存区、方法区(Permanent Generation)

用于存储已被虚拟机加载的类信息、常量、静态变量等

0ACTWd05

2.Java常见的内存问题表现形式:

2.1 OutOfMemory:内存溢出

2.2 Memory Leak:内存泄露

二者共同点:

(1) 通常最终的状态就会导致OOM错误

(2) 在Java堆或本地内存中都可能发生

二者不同点:

(1) ML是已经分配好的内存或对象,当不再需要,没有得到释放 而OOM则是没有足够的空间来供jvm分配新的内存块

(2) ML的内存曲线总体上是一条斜向上的曲线而OOM不是,反之未必

3.内存溢出类型:

虚拟机栈溢出、本地方法栈溢出、方法区溢出、堆溢出、运行时常量池溢出

异常类型:

(1) java.lang.OutOfMemoryError: Java heap space

堆内存溢出

优化:通过-Xmn(最小值)–Xms(初始值) -Xmx(最大值)参数手动设置 Heap(堆)的大小。

(2) java.lang.OutOfMemoryError: PermGen space

PermGen Space溢出(方法区溢出、运行时常量池溢出)

优化:通过MaxPermSize参数设置PermGen space大小。

(3) java.lang.StackOverflowError

栈溢出(虚拟机栈溢出、本地方法栈溢出)

优化:通过Xss参数调整

Demo代码 :

// Java 堆溢出publicstaticvoid main(String args) {List<OOMObject>list=newArrayList<JavaHeapSpace.OOMObject>;while(true) {list.add(newOOMObject); } }staticclassOOMObject{}// 虚拟机栈溢出publicstaticvoidmain(String args) {// TODO Auto-generated method stubSystem.out.println(add); }publicstaticintadd{returnadd; }// 方法区溢出publicstaticvoidmain(String args) {while(true) { Enhancer enhancer =newEnhancer; enhancer.setSuperclass(OOMObject.class); enhancer.setUseCache(false); enhancer.setCallback(newMethodInterceptor {@OverridepublicObjectintercept(Object obj, Method method, Object args, MethodProxy proxy)throwsThrowable {returnproxy.invoke(obj, args); } }); enhancer.create; } }staticclass OOMObject { }// 运行时常量池溢出publicstaticvoidmain(String args){// TODO Auto-generated method stubList<String> list =newArrayList<String>;inti =0;while(true){ list.add(String. valueOf(i++).intern); } }// 内存泄露模拟publicstaticvoidmain(String args) {// TODO Auto-generated method stubList<int> list =newArrayList<int>; Runtime run = Runtime.getRuntime;inti=1;while(true){int arr =newint[1024]; list.add(arr);if(i++ %1000==0){ System.out.print("最大堆内存="+ run.maxMemory /1024/1024+"M, "); System.out.print("已分配内存="+ run.totalMemory /1024/1024+"M, "); System.out.print("剩馀空间内存="+ run.freeMemory /1024/1024+"M, "); System.out.println("最大可用内存="+ ( run.maxMemory - run.totalMemory + run.freeMemory ) /1024/1024+"M"); sleep(1000); } } }publicstaticvoidsleep(longtime) {try{ Thread.sleep(time); }catch(InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace; } }4.内存泄露现象

0ACTWd06

heapspace:OutOfMemoryError

0ACTWd07

开发人员的分析、解决思路

内存对象申请未释放(未及时释放)

线程问题

分别从堆dump和线程dump进行分析:

jmap -dump:format=b,file=heap.dump pid

jstack pid >> thread.dump

5.JAVA DUMP分析工具

IBM HeapAnalyzer:ha456.jar

IBM Thread and Monitor Dump Analyzer:jca457.jar

堆dump分析

占用内存较多代码块

分析代码快上下文

分析占用内存的对象内容

0ACTWd08

线程dump分析

活跃线程

阻塞线程

等待资源线程

0ACTWd09

Java应用CPU问题分析方法

1.程序响应慢,CPU高

(1) ThreadDump

jstack pid >> thread.dump

(2) 找到导致cpu高的线程 top -H -p pid

(3) pid 十进位转十六进位

(4) 找到对应的线程UE打开 threaddump文件查找:按十六进位关键字找到对应的线程,把相关的方法找出来,可以精确到代码的行号

2.程序响应慢,CPU不高

一般表现为thread struck在了i/o、db等

实例:

IO阻塞(程序表现为响应慢)

线程状态为「in Object.wait」,说明正在等待线程池可用资源,由于线程池满导致新的IO请求处于排队等待状态,且发生在:at com.iflytek.diange.data.provider.sendsong.impl.SendSongImpl.getSendSongInfosByUserId(SendSongImpl.java:92)行

0ACTWd0A

3.程序无响应

死锁(程序表现为无响应)

线程状态为「waiting to lock」: 两个线程各持有一个锁,又在等待另一个锁,故造成死锁,且发生在DeadLockTest.java:39行

0ACTWd0B