Hadoop源代码分析 之Datanode工作原理(4)—–拷贝本地文件到hadoop的过程

原创文档,转载请将原文url地址标明

hadoop 相关视频下载地址: http://pan.baidu.com/share/link?shareid=223046310&uk=3073578852

本篇文档我们重点研究一下通过hadoop的客户端的上传一个文件到hadoop的过程,通过这个过程我们了解hadoop的datanode及namenode的相关工作过程。本篇主要集中精力研究源代码,以及如何阅读这部分源代码,下篇文档我们将重点总结一下相关工作过程以及一个相关的五声音视频演示如何动态分析这个给过程的。

引言,hadoop代表一种全新的编程思想,基于hadoop有很多衍生项目,充分利用他们

为我们服务是非常必要的,同时hadoop又是一个复杂系统,若能理解他的工作原理对

我们将有非常大的帮助,我们在这里以hadoop的0.1.0版本为基础逐步分析他的基本工作

原理、结构、思路等等,希望通过这个能帮助我们理解生产中的hadoop系统。 时间有限,

经验不足,疏漏难免,在这里仅仅分享一些心得,希望对大家能起到一个抛砖引玉的作用吧,

有问题请大家给我留言或者评论等,这样也能对我的工作有莫大的帮助。

感谢您阅读这篇文章!

 

一.初始化过程

初始化过程重点分析从main函数启动到整个相关准备工作完成,准备发送数据前的一段过程的总体情况。

再次强调,我们的分析仅仅是一部分比较重要的源代码,没有分析全部源代码,具体情况读者要去阅读源代码,仔细体会才会有很大收获。

1. Main函数

源代码如下:

public static void main(String argv[]) {

try {

.....

Configuration conf = new Configuration();   // 创建一个默认的配置文件

int i = 0;

FileSystem fs = FileSystem.parseArgs(argv, i, conf); // 非常重要,这个决定是本地的文件操作还是分布式的文件操作,从而决定是否需要访问namenode,datanode等,请慢慢体会,有兴趣读者可以去分析相关源代码。我们这里忽略不计。

String cmd = argv[i++];

try {

DFSShell tc = new DFSShell(fs);

if ("-put".equals(cmd) || "-copyFromLocal".equals(cmd)) {

tc.copyFromLocal(new File(argv[i++]), argv[i++]);   // 本篇文档中的研究的对象

.....

} else if ("-moveFromLocal".equals(cmd)) {

tc.moveFromLocal(new File(argv[i++]), argv[i++]);

} else if ("-get".equals(cmd) || "-copyToLocal".equals(cmd)) {

tc.copyToLocal(argv[i++], new File(argv[i++]));

2. copyFromLocal函数

void copyFromLocal(File src, String dstf) throws IOException {

fs.copyFromLocalFile(src, new File(dstf));// 本篇文档中的研究的对象,读者需要将鼠标光标移动这个“copyFromLocalFile”上面,然后按下ctrl+T,然后会弹出相关对话框,选择分布式的一项,如下面两幅图像

}

wps_clip_image-1887_thumb[1]

注意如上图点击“copyFromLocalFile”文本后,显示如上图后,按下键盘ctrl + T将列出相关的代码实现供我们选择。

这种情况在面向对象等技术中经常出现,例如一个接口有多个集成,一个类有多个子类,因此当我们什么一个基类或者接口变量时,具体是采用那个子类为这个变量赋值是在程序运行时决定的。

那么这种情况下,我们如何在程序没有运行过程中了解程序将来将运行到那个函数的实现中?

我们通过上面的ctrl + T就是让系统帮助我们找到系统中都有哪些代码是实现这个函数的,然后我们根据具体运行目的来推测应该运行那个版本或者类的方法。

Ctrl + T的结果如下

wps_clip_image-5383_thumb[1]

从这里可以看出,有两个类实现了FileSystem类的相关接口

分别是:“public class DistributedFileSystem extends FileSystem {”, “public class LocalFileSystem extends FileSystem {”

具体相关实现,请大家自己到代码中去寻找了。只有亲自体验体会才会深刻,收获才大。辛苦您啦!

我们这里选择“DistributedFileSystem ”这个相关项目

3. copyFromLocalFile函数

public void copyFromLocalFile(File src, File dst) throws IOException {

doFromLocalFile(src, dst, false);  // 中间调用函数

}

4. doFromLocalFile函数

这个是具体进行相关工作的一个主要逻辑的函数了。大家要仔细研究这个过程。

但是这个函数中有很多都是经过封装的过程,我们仅仅能了解工作过程,无法了解底层通信的具体过程。

因此本文不研究具体的拷贝过程,我们仅仅研究底层细节上的通信过程,看看这些过程是在哪里被调用的,如何调用的,在何处被调用。

private void doFromLocalFile(File src, File dst, boolean deleteSource)

throws IOException {

// 检查分布式文件是否已经存在

if (exists(dst)) { // 里面已经调用了namenode节点进行相关判断,本文不分析

// 若是文件存在,不是目录, 则文件存在,抛出异常

if (!isDirectory(dst)) {

throw new IOException("Target " + dst + " already exists");

} else {

// 若是目录,则文件不存在, 根据目标目录及源文件名称构造目标

dst = new File(dst, src.getName());

if (exists(dst)) { // 检查目录是否存在,存在报错

throw new IOException("Target " + dst + " already exists");

} } }

// 构造本地文件

FileSystem localFs = getNamed("local", getConf());

// 判断本地文件是目录

if (localFs.isDirectory(src)) {

mkdirs(dst); // 分布式文件系统中创建文件夹

File contents[] = localFs.listFiles(src); // 获取目录中文件列表, 递归调用拷贝资源

for (int i = 0; i < contents.length; i++) {

doFromLocalFile(contents[i], new File(dst, contents[i]

.getName()), deleteSource);

}

} else {

// 声明一个缓存区 , 从本地文件系统 读取数据

byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];

InputStream in = localFs.open(src); // 打开本地文件流

try {

OutputStream out = create(dst); // 重点,极其重要,本文的重点切入点

try {

int bytesRead = in.read(buf); // 读取数据

while (bytesRead >= 0) {

out.write(buf, 0, bytesRead); // 写入数据,这个是我们本篇文档讨论的另外一个重点。

bytesRead = in.read(buf);

。。。。。 // 剩余代码省略,请查看项目源代码

5. create函数

创建一个数据输出流

public FSDataOutputStream create(File f) throws IOException {

return create(f, true, getConf().getInt("io.file.buffer.size", 4096)); // 重点观察从配置文件中获取的参数及相关后续工作过程。

}

wps_clip_image-8136_thumb[1]

public FSDataOutputStream create(File f, boolean overwrite, int bufferSize)

throws IOException {

return new FSDataOutputStream(this, f, overwrite, getConf(), bufferSize);

}

6. FSDataOutputStream函数

这个函数过程非常重要,尽管仅仅几条语句,但是做了几层封装,最外层buffer,里面PositionCache,最后 Summer,这些都是非常重要的类,特别是在于数据写入上面的过程。并且都是内部类,大家要仔细阅读代码,不在多说

public FSDataOutputStream(FileSystem fs, File file, boolean overwrite,

Configuration conf, int bufferSize) throws IOException {

super(new Buffer(

new PositionCache(

new Summer(fs, file, overwrite, conf)

), bufferSize)

); }

wps_clip_image-18830_thumb[1]

重点看上图中不同颜色区域的代码

7. Summer函数

public class FSDataOutputStream extends DataOutputStream {

/** Store checksums for data. */

private static class Summer extends FilterOutputStream {

.....

public Summer(FileSystem fs, File file, boolean overwrite,

Configuration conf) throws IOException {

super(fs.createRaw(file, overwrite)); // 需要注意这里面的 fs参数, 系统经常根据 这个参数 来 确定使用本地的或者分布式文件系统

这个是创建一些文件输出流,这个流最终是一个网络相关对象,详情参见下面介绍

this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);

this.sums = new FSDataOutputStream(fs.createRaw(fs

.getChecksumFile(file), true), conf); // 同上面类似,这里也是创建一个网络相关的文件流,但是这个文件是做crc校验的,

sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);

sums.writeInt(this.bytesPerSum);

}

这里面要注意的是创建的对象的类及父类。面向对象中尤为重要

8. createRaw函数(super(fs.createRaw(file, overwrite))调用分支)

网络调用就要出来啦,重点文章就在这里了,请重点阅读!!!

public FSOutputStream createRaw(File f, boolean overwrite)

throws IOException {

return dfs.create(getPath(f), overwrite);

}

9. DFSClient的create函数

这个是网络的客户端函数,这里就是我们本文的核心了,一切核心内容都将在这个类中慢慢展开。

public FSOutputStream create(UTF8 src, boolean overwrite)

throws IOException {

return new DFSOutputStream(src, overwrite);

}

这个函数的返回值对系统工作状况影响巨大

wps_clip_image-32398_thumb[1]

如上图,返回的是一个DFSOutputStream对象

这个对象有个方法,这个方法及其重要,负责同namenode及datanode进行通信,交换数据及内容。

/**

* Open a DataOutputStream to a DataNode so that it can be written to.

* This happens when a file is created and each time a new block is

* allocated. Must get block ID and the IDs of the destinations from the

* namenode.

*/

private synchronized void nextBlockOutputStream() throws IOException {

boolean retry = false;

long start = System.currentTimeMillis();

。。。。。

是本片文档重点讨论的内容,这里面肩负着同namenode通信获取块存放位置,同时负责链接具体的datanode节点传输数据

10. DFSOutputStream函数

下面过程也非常重要,主要是关于我们的客户端如何写入数据到本地,然后如何在写入到远程服务器的,这个对hadoop的工作效率是有很大影响的

public DFSOutputStream(UTF8 src, boolean overwrite) throws IOException {

this.src = src;

this.overwrite = overwrite;

// 备份文件   /tmp/d1/tmp/client-4723689804732826540

this.backupFile = newBackupFile();

this.backupStream = new FileOutputStream(backupFile);

}

11. newBackupFile函数

创建临时文件在本地

private File newBackupFile() throws IOException {

File result = conf.getFile("dfs.data.dir", "tmp" + File.separator

+ "client-" + Math.abs(r.nextLong()));

result.deleteOnExit();

return result;

}

12. 校验文件(fs.createRaw(fs.getChecksumFile(file), true), conf);)

这个是创建crc校验文件的过程

/** Return the name of the checksum file associated with a file. */

public static File getChecksumFile(File file) {

return new File(file.getParentFile(), "." + file.getName() + ".crc");

}

13. PositionCache函数

对java的流文件提供一个position的功能

private static class PositionCache extends FilterOutputStream {

long position;

public PositionCache(OutputStream out) throws IOException {

super(out);

}

// This is the only write() method called by BufferedOutputStream, so we

// trap calls to it in order to cache the position.

public void write(byte b[], int off, int len) throws IOException {

out.write(b, off, len);

position += len; // update position

}

public long getPos() throws IOException {

return position; // return cached position

}

}

14. Buffer函数

提供一个缓冲的功能

private static class Buffer extends BufferedOutputStream {

public Buffer(OutputStream out, int bufferSize) throws IOException {

super(out, bufferSize);

}

public long getPos() throws IOException {

return ((PositionCache) out).getPos() + this.count;

}

// optimized version of write(int)

public void write(int b) throws IOException {

if (count >= buf.length) {

super.write(b);

} else {

buf[count++] = (byte) b;

}

}

}

二.初始化过程动态验证

下图是通过设置断点在程序中,然后获取到程序堆栈调用信息。

wps_clip_image-22339_thumb[1]

如上图,我们通过在特定的程序中设置断点,然后启动调试模式,然后让程序运行到断点处,最后程序堆栈信息就帮助我们了解程序的准确调用顺序了。

1. 程序调用main函数,如1处

2. 程序调用 dfsshell.copyFromLocal

3. ......

最后程序在10部分调用了DFSClient.create(。。。。),代码读者自己去慢慢阅读,同时我们前面也有了一些介绍,供参阅。

三.数据输出out.write(buf, 0, bytesRead);

本部分重点研究如何输出数据到远程服务器的,但在这个部分代码通过静态分析比较麻烦,不得要领,作者也没有好的办法,因此先通过动态调用设置断点的办法判断都调用了那些函数,然后一个个函数的分析相关功能。

wps_clip_image-31222_thumb[1]

从上图中我们需要注意 红色的两个区域,一个小区域,表明类,另一个红色的大区域表明代码停止在那里。

另外我们需要注意到两个绿色区域,及他们之间的代码行

一个是:xxxx$PositionCache.write(xxxxxx)

一个是:xxxx$Summer.write(xxxxxx)

最后是:DFSClient$DFSOutputStream.write(xxxxx)

我们下面主要分析上面列出的几个函数,其他函数请读者自行慢慢研究了

1. PositionCache.write函数

如下,这个类是个内部类,代码如下:

public class FSDataOutputStream extends DataOutputStream {

.....

private static class PositionCache extends FilterOutputStream {

long position;

.....

public void write(byte b[], int off, int len) throws IOException {

out.write(b, off, len);   // 调用内部对象进行输出

position += len; // update position   // 更新位置信息

}

public long getPos() throws IOException {

return position; // return cached position

}

}

2. Summer.write函数

public class FSDataOutputStream extends DataOutputStream {

.....

/** Store checksums for data. */

private static class Summer extends FilterOutputStream {

....

public void write(byte b[], int off, int len) throws IOException {

int summed = 0;

while (summed < len) {  // 出来校验和

int goal = this.bytesPerSum - inSum;

int inBuf = len - summed;

int toSum = inBuf <= goal ? inBuf : goal;

sum.update(b, off + summed, toSum);

summed += toSum;

inSum += toSum;

if (inSum == this.bytesPerSum) {

writeSum();

}

}

out.write(b, off, len); // 输出真正的数据

}

3. DFSClient$DFSOutputStream.write函数

这个类及相关函数是本文的重点中的重点,务必仔细阅读。

class DFSClient implements FSConstants {

....

class DFSOutputStream extends FSOutputStream {

....

public synchronized void write(byte b[], int off, int len)

throws IOException {

if (closed) {

throw new IOException("Stream closed");

}

while (len > 0) {

int remaining = BUFFER_SIZE - pos;

int toWrite = Math.min(remaining, len);

System.arraycopy(b, off, outBuf, pos, toWrite);

pos += toWrite;

off += toWrite;

len -= toWrite;

filePos += toWrite;

if ((bytesWrittenToBlock + pos >= BLOCK_SIZE)    // 这个条件判断非常重要,当写入数据大于块尺寸时,进行这个分支,或者。。。。。

|| (pos == BUFFER_SIZE)) {

flush();  // 关键函数,下面给出这个函数的定义

}

}

}

4. DFSClient$DFSOutputStream.flush函数

/**

* Flush the buffer, getting a stream to a new block if necessary.

*/

public synchronized void flush() throws IOException {

if (closed) {

throw new IOException("Stream closed");

}

if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {

flushData(BLOCK_SIZE - bytesWrittenToBlock);

}

// 在刷新数据时,若是写入数据的数量到达块的尺寸了,则调用结束块的操作,传递数据到服务器

if (bytesWrittenToBlock == BLOCK_SIZE) {

endBlock();  // 重要函数

}

flushData(pos);

}

5. DFSClient$DFSOutputStream.endBlock函数

private synchronized void endBlock() throws IOException {

。。。。。

boolean mustRecover = true;

while (mustRecover) {

nextBlockOutputStream(); // 重要的函数调用

InputStream in = new FileInputStream(backupFile);

try {

byte buf[] = new byte[BUFFER_SIZE];

int bytesRead = in.read(buf);

while (bytesRead > 0) {

blockStream.writeLong((long) bytesRead);

blockStream.write(buf, 0, bytesRead);

bytesRead = in.read(buf);

}

internalClose();

mustRecover = false;

} catch (IOException ie) {

handleSocketException(ie);

} finally {

in.close();

}

}

//

// Delete local backup, start new one

//

backupFile.delete();

backupFile = newBackupFile();

backupStream = new FileOutputStream(backupFile);

bytesWrittenToBlock = 0;

}

6. DFSClient$DFSOutputStream.nextBlockOutputStream函数

本函数是本片文章最后要重点说明的地方,代码不在缩减,请大家慢慢看啦,注意特殊颜色字体等

/**

* Open a DataOutputStream to a DataNode so that it can be written to.

* This happens when a file is created and each time a new block is

* allocated. Must get block ID and the IDs of the destinations from the

* namenode.

*/

private synchronized void nextBlockOutputStream() throws IOException {

boolean retry = false;

long start = System.currentTimeMillis();

do {

retry = false;

long localstart = System.currentTimeMillis();

boolean blockComplete = false;

LocatedBlock lb = null;

while (!blockComplete) {

if (firstTime) {

lb = namenode.create(src.toString(), clientName

.toString(), localName, overwrite); // 首次创建一个文件,同时获取文件 应该发送给那个节点的 数据信息(信息在返回值中)

} else {

lb = namenode.addBlock(src.toString(), localName);// 调用 添加块的 功能,函数返回 应该发送给那个节点的返回值

}

if (lb == null) {

try {

Thread.sleep(400);

if (System.currentTimeMillis() - localstart > 5000) {

LOG

.info("Waiting to find new output block node for "

+ (System.currentTimeMillis() - start)

+ "ms");

}

} catch (InterruptedException ie) {

}

} else {

blockComplete = true;

}

}

block = lb.getBlock();

DatanodeInfo nodes[] = lb.getLocations(); // 这里获取具体的datanode的信息,然后准备传输数据了!!!!下面是标准的java网络程序不在说明

//

// Connect to first DataNode in the list. Abort if this fails.

//

InetSocketAddress target = DataNode.createSocketAddr(nodes[0]

.getName().toString());

try {

s = new Socket();

s.connect(target, READ_TIMEOUT);

s.setSoTimeout(READ_TIMEOUT);

} catch (IOException ie) {

// Connection failed. Let's wait a little bit and retry

try {

if (System.currentTimeMillis() - start > 5000) {

LOG.info("Waiting to find target node: " + target);

}

Thread.sleep(6000);

} catch (InterruptedException iex) {

}

if (firstTime) {

namenode.abandonFileInProgress(src.toString());

} else {

namenode.abandonBlock(block, src.toString());

}

retry = true;

continue;

}

//

// Xmit header info to datanode

//

DataOutputStream out = new DataOutputStream(

new BufferedOutputStream(s.getOutputStream()));

out.write(OP_WRITE_BLOCK);

out.writeBoolean(false);

block.write(out);

out.writeInt(nodes.length);

for (int i = 0; i < nodes.length; i++) {

nodes[i].write(out);

}

out.write(CHUNKED_ENCODING);

bytesWrittenToBlock = 0;

blockStream = out;

blockReplyStream = new DataInputStream(new BufferedInputStream(

s.getInputStream()));

} while (retry);

firstTime = false;

}

 

总结,

通过上面14页A4纸的相关信息,我们就是要了解hadoop上传文件过程中到底发生了什么?现在初步有了结果。

Hadoop想检查文件信息,包括本地,hdfs中(通过namendoe进行)两个都没问后,

初始化数据输出流,这个过程中并没有传输数据呀!!数据先在本地缓存中,但到达一定量后,或流结束了调用下面的过程

Hadoop调用namenode获取块存储目标,根据存储目标,由客户端通过socket链接datanode节点传输数据。

最后。。。。。。自己看吧,实在太长了。。。。

 

参考文章
Hadoop源代码分析 之Datanode工作原理(5)—–拷贝文件过程总结

Hadoop源代码分析 之Datanode工作原理(4)—–拷贝本地文件到hadoop的过程

Hadoop源代码分析 之Datanode工作原理(3)—–datanode工作过程总结

hadoop源代码介绍

Hadoop源代码分析 之Datanode工作原理(2)—–datanode基本工作过程

Hadoop源代码分析 之Datanode工作原理(1)—–datanode启动过程代码分析

Hadoop源代码分析 之hadoop配置及启动(4)—–启动过程汇总

Hadoop源代码分析 之hadoop配置及启动(3)—–classpath与hadoop主要组件启动过程

Hadoop源代码分析 之hadoop配置及启动(2)—–classpath与启动shell脚本

Hadoop源代码分析 之hadoop配置及启动(1)—–classpath与配置文件

Hadoop源代码分析 之hadoop源代码项目(1)—–创建eclipse下java项目

Hadoop源代码分析 之环境配置(2)—–虚拟机ip配置

Hadoop源代码分析 之环境配置(1)—–hadoop虚拟机配置

Hadoop源代码分析 之概念介绍(2)—–初学者眼中的hadoop

Hadoop源代码分析 之概念介绍(1)—–服装加工,火车货物检查与hadoop

调试eclipse下hadoop的map reduce程序

发表评论