原创文章,转载请指明出处并保留原文url地址
引言,hadoop代表一种全新的编程思想,基于hadoop有很多衍生项目,充分利用他们
为我们服务是非常必要的,同时hadoop又是一个复杂系统,若能理解他的工作原理对
我们将有非常大的帮助,我们在这里以hadoop的0.1.0版本为基础逐步分析他的基本工作
原理、结构、思路等等,希望通过这个能帮助我们理解生产中的hadoop系统。 时间有限,
经验不足,疏漏难免,在这里仅仅分享一些心得,希望对大家能起到一个抛砖引玉的作用吧,
有问题请大家给我留言或者评论等,这样也能对我的工作有莫大的帮助。
感谢您阅读这篇文章!
hadoop 相关视频下载地址: http://pan.baidu.com/share/link?shareid=223046310&uk=3073578852
今天我们来个柿子专找软的捏(好吃),我们来共同研究一下hadoop中datanode的相关情况。 datanode在整个hadoop的启动过程中最先启动(见前面文章),同时也相对简单。但是其架构, 网络访问方式,多线程处理等等,都同hadoop的其他组件相同,但却功能较少,简单易于分析,因此我们今天拿他开刀。
本篇文章我们重点研究Datanode启动,稍后我们下一篇研究他的工作过程,之后研究他整体结构,最后我们在研究他的网络通信等过程。
Datanode是java编写的程序, 任何java编写的程序运行起始函数都是main函数,因此我们仅仅需要找到相关的起始函数,就可以了解datanode的启动过程,之后再研究工作过程,研究了工作过程自然了解他的功能了。
我们本次分析也是主要从大的方面入手,先在总体了解相关功能,然后逐步细化,甚至不细化, 彻底分析出0.1.0版本的hadoop不是我们的根本目的,我们根本目的是,由0.1.0入手研究0.20.0的hadoop源代码情况, 然后由hadoop研究到hbase研究, 最终通过hbase来解决海量数据相关问题等等。
1. hadoop的datanode节点Main函数
public static void main(String args[]) {
try {
LogFormatter.setShowThreadIDs(true); // 日志相关,我们在此忽略,以后也将忽略相关日志的情况
runAndWait(new Configuration());// 先创建一个默认的 配置文件对象, 配置文件在classpath中寻找配置信息, 然后调用runAndWait函数,从函数名称大家都能明白,这个函数负责run些东西 ,然后等待结束,那么是否真的结束呢,显然若是这个函数返回,main函数就完成了,这个进程就退出了,这个datanode节点就关闭了,因此这里的wait是不会返回的,直到发生一些异常或者被命令退出等一些列特定事件,这个函数会返回,然后datanode节点结束。
} catch (IOException e) { // 后面是一些异常信息等等,我们忽略,不在讨论
......
2.hadoop的datanode节点runAndWait函数
/**
* Start datanode daemons. Start a datanode daemon for each comma separated
* data directory specified in property dfs.data.dir and wait for them to
* finish. If this thread is specifically interrupted, it will stop waiting.
*/
// 上面是hadoop本身的相关注释,大家英文好的,可以看看,hadoop的注释做的是非常好,至少比zookeeper好很多!以后我们将省略注释代码
private static void runAndWait(Configuration conf) throws IOException {
run(conf); // 负责启动一下工作环境等等,包括一些线程,对象,稍后详细分析这个函数。
// 下面这段代码对一个list进行遍历,核心功能是对每个线程执行join操作,这个是java的线程的一个函数,目的是当前线程同被合并线程合并,并等待那个线程的结束,在那个线程未结束前,被线程一直等待,直到等待结束。下面是一个有关解释join的文章http://www.blogjava.net/jnbzwm/articles/330549.html
for (Iterator iterator = subThreadList.iterator(); iterator.hasNext();) {
Thread threadDataNode = (Thread) iterator.next();
try {
threadDataNode.join(); // 等待线程结束,这里面有个非常有意思的情况,这里若有多个线程,代码执行到这里,是先等待第一个线程结束后,执行下一循环迭代,还是全部循环迭代都完成后在等待线程???(需要了解函数的阻塞特性啦,以后有时间我们聊这个话题)
....... 为了节省篇幅,省略代码,代码详情请到前面文章下载相关项目
上面代码中有个变量:subThreadList,这个变量的定义在下面
private static Vector 1subThreadList = null;
这个变量一共有两个函数访问它,一个是runAndWait函数,一个run函数,我们下面将介绍run函数,因此这个变量将不在单独介绍,大家看完run函数后及明白了这个变量的作用啦。
3. hadoop的datanode节点Run函数
public static void run(Configuration conf) throws IOException {
String[] dataDirs = conf.getStrings("dfs.data.dir"); // 取得 数据的存放目录
subThreadList = new Vector(dataDirs.length); // 为每个目录创建一个线程,因此需要一个连接表(这边版本用的向量方式)存储线程信息等
for (int i = 0; i < dataDirs.length; i++) {
DataNode dn = makeInstanceForDir(dataDirs[i], conf); // 构造一个DataNode类的实例来提供相关服务,注意,我们前面1,2,3以及4讨论的函数都是静态的,不属这个类的任何一个实例的,可以算是一些必要的准备工作了。
if (dn != null) {
// 若是创建dn实例成功(返回对象不是null),则创建一次线程,run函数是 datanode类的
Thread t = new Thread(dn, "DataNode: " + dataDirs[i]);
t.setDaemon(true); // needed for JUnit testing // 设置这个线程为守护线程, 关于守护线程的概念,参考这个文章http://blog.csdn.net/m13666368773/article/details/7245570,已经很明白了。
t.start(); // 启动这个线程
subThreadList.add(t); // 添加线程对象到列表中, 然后后续函数遍历每个线程,知道每个线程都结束,datanode才结束
4.hadoop的datanode节点makeInstanceForDir函数
static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException {
DataNode dn = null; // 大家都明白
File data = new File(dataDir); // 创建一个文件对象
data.mkdirs(); // 递归根据文件名称创建各级目录(若是目录不存在情况下)
if (!data.isDirectory()) { // 判断若是数据文件dir 不是目录,则返回null值,程序会抛异常等
LOG.warning("Can't start DataNode in non-directory: "+dataDir);
return null;
} else {
dn = new DataNode(conf, dataDir); // 调用构造函数,创建一个对象,将配置文件及数据目录传递给相应的对象
}
return dn; // 返回对象
}
5. 构造函数
public DataNode(Configuration conf, String datadir) throws IOException {
this(InetAddress.getLocalHost().getHostName(),
new File(datadir),
createSocketAddr(conf.get("fs.default.name", "local")), conf);
// 根据用户的设置信息, 调用网络相关函数获取信息, 最后通过配置对象获取配置信息, 根据这些信息,调用另外一个构造函数,相关构造函数在下面。
}
另一个被调用的构造函数
public DataNode(String machineName, File datadir, InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr, conf); // 获取namenode对象的一个代理对象, 然后通过这个代理对象进行相关的信息通信等,我们后面将专门分析这个代理创建过程,以及代理是如何工作,但相关内容不在本篇范围内。
this.data = new FSDataset(datadir, conf); //构造一个底层数据处理的对象,这个对象负责具体的数据处理等相关事宜(关于本地的)
ServerSocket ss = null; // 声明一个socket server对象, 记住呕,datanode也是server节点,他也要接受别人的请求,因此一个server类型的 socket对象是必须的。
int tmpPort = conf.getInt("dfs.datanode.port", 50010); // 你知道的,获取监听端口
while (ss == null) { // 这个里面的while很有意思啦,大家有时间可以研究一下(while同下面的 端口++配合完成了一些功能)
try {
ss = new ServerSocket(tmpPort); // 创建server对象
LOG.info("Opened server at " + tmpPort);
} catch (IOException ie) {
LOG.info("Could not open server at " + tmpPort + ", trying new port");
tmpPort++;
}
}
this.localName = machineName + ":" + tmpPort; // 机器名称及端口构成了 本地名称
// 创建一个数据任务接受对象等,来完成相关datanode的相关任务,任务接受对象工作在独立的线程中,独立的,相关代码在下一篇文章中我们将详细介绍,本篇我们仅仅知道他是负责实际接受用户请求的一个线程就可以了。
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
this.dataXceiveServer.start();
下面是获取一下工作过程中的常量
long blockReportIntervalBasis =
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
this.blockReportInterval =
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
this.datanodeStartupPeriod =
conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
}
本篇文章仅仅是对启动工作过程中涉及一些主要代码的解释,读者若是想真正理解相关过程,必须自己去体会,自己去看代码,去调试代码等等,相关开发环境及项目在前面文章中已经给出,可以查找前面的文章,再次感谢您阅读本文章,谢谢!
参考文章
Hadoop源代码分析 之Datanode工作原理(5)—–拷贝文件过程总结
Hadoop源代码分析 之Datanode工作原理(4)—–拷贝本地文件到hadoop的过程
Hadoop源代码分析 之Datanode工作原理(3)—–datanode工作过程总结
Hadoop源代码分析 之Datanode工作原理(2)—–datanode基本工作过程
Hadoop源代码分析 之Datanode工作原理(1)—–datanode启动过程代码分析
Hadoop源代码分析 之hadoop配置及启动(4)—–启动过程汇总
Hadoop源代码分析 之hadoop配置及启动(3)—–classpath与hadoop主要组件启动过程
Hadoop源代码分析 之hadoop配置及启动(2)—–classpath与启动shell脚本
Hadoop源代码分析 之hadoop配置及启动(1)—–classpath与配置文件
Hadoop源代码分析 之hadoop源代码项目(1)—–创建eclipse下java项目
Hadoop源代码分析 之环境配置(1)—–hadoop虚拟机配置
Hadoop源代码分析 之概念介绍(2)—–初学者眼中的hadoop