`
yunsamzhang
  • 浏览: 69121 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

说说hadoop的DFSOutputStream

阅读更多
当我们用命令:

hadoop fs -copyFromLocal localfile hdfs://...

将本地文件复制到HDFS时,其背后的复制过程是怎样的?本地文件通过什么方式传输到datanode上的呢?

这里面很显然的是:
1、文件在多个电脑之间进行了传输(至少有2台电脑:本地电脑和一个datanode节点)。
2、如果文件超过一个block的大小(默认是64M),那么将一个文件分割成多个block是在哪里发生的?

带着这些疑问,我们来解读一下源代码。

一、找到“幕后英雄”

通过简单的跟踪,就会发现这一功能是由FileSystem类的copyFromLocalFile方法完成的。

继续跟踪,会发现拷贝其实是在2个文件系统中进行的,下面是FileUtil类的一段代码:

  /** Copy files between FileSystems. */
  public static boolean copy(FileSystem srcFS, Path src, 
                             FileSystem dstFS, Path dst, 
                             boolean deleteSource,
                             boolean overwrite,
                             Configuration conf) throws IOException {
      ... // 为了突出重要代码,这里省略了部分代码
      InputStream in=null;
      OutputStream out = null;
      try {
        in = srcFS.open(src);
        out = dstFS.create(dst, overwrite);
        IOUtils.copyBytes(in, out, conf, true);
      } catch (IOException e) {
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
        throw e;
      }
      ... 
  }

copyFromLocalFile的实质是将文件从LocalFileSystem复制到DistributedFileSystem

从上面的代码可以看出,复制的关键是:
1、获得本地文件系统的输入流(用来读取本地文件)
2、获得HDFS的输出流(用来向HDFS写入数据)
3、从第一流读取数据,写入第二个流。

从LocalFileSystem获取输入流很简单。问题是,如何获取DistributedFileSystem的输出流呢?

继续读代码,会发现:
DistributedFileSystem借助了DFSClient类,来实现客户端与HDFS之间文件的传输任务。

在DFSClient类中,创建流的是这一句:
OutputStream result = new DFSOutputStream(src, masked,
        overwrite, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));

所以,所有的奥秘就应该在类DFSOutputStream中了。


二、解读DFSOutputStream

DFSOutputStream负责将数据传输到HDFS中,既然数据是在本地读取的,又要保存在另外一台机器(datanode)上,所以这里面一定会涉及到Socket。

通过阅读DFSOutputStream源码,果然发现了DFSOutputStream对底层的socket通信进行的包装的细节,先说说DFSOutputStream中的几个变量:
	private Socket s;	// 与datanode之间建立的socket连接
	private DataOutputStream blockStream;	// socket的输出流(client->datanode),用于将数据传输给datanode
	private DataInputStream blockReplyStream; // socket的输入流(datanode->client),用户收到datanode的确认包

除了socket和流以外,DFSOutputStream还有2个队列和2个线程:
    private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
    // dataQueue是数据队列,用于保存等待发送给datanode的数据包
    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
    // ackQueue是确认队列,保存还没有被datanode确认接收的数据包
    ...
    private DataStreamer streamer = new DataStreamer();;
    // streamer线程,不停的从dataQueue中取出数据包,发送给datanode
    private ResponseProcessor response = null;
    // response线程,用于接收从datanode返回的反馈信息

所以,在向DFSOutputStream中,写入数据(通常是byte数组)的时候,实际的传输过程是:
1、byte[]被封装成64KB的Packet,然后扔进dataQueue中
2、DataStreamer线程不断的从dataQueue中取出Packet,通过socket发送给datanode(向blockStream写数据)
    发送前,将当前的Packet从dataQueue中移除,并addLast进ackQueue
3、ResponseProcessor线程从blockReplyStream中读出从datanode的反馈信息
       反馈信息很简单,就是一个seqno,再加上每个datanode返回的标志(成功标志为DataTransferProtocol.OP_STATUS_SUCCESS)
       通过判断seqno(序列号,每个Packet有一个序列号),判断datanode是否接收到正确的包。
       只有收到反馈包中的seqno与ackQueue.getFirst()的包seqno相同时,说明正确。否则可能出现了丢包的情况。

如果一切OK,则从ackQueue中移出:ackQueue.removeFirst(); 说明这个Packet被datanode成功接收了。


三、datanode端是怎么接收数据的?

上面分析的代码都位于客户端,那么datanode端的代码又如何呢?

答案是:

在DataNode端,有一个Daemon线程:dataXceiverServer,它有一个用于数据传输的ServerSocket一直开在那里。

每当有client连接到datanode时,datanode会new一个DataXceiver

DataXceiver负责数据的传输工作。

如果是写操作(client->datanode),则调用writeBlock方法:
	case DataTransferProtocol.OP_WRITE_BLOCK:
        	writeBlock( in );

writeBlock方法负责:将数据写入本地磁盘,并负责将数据传输给其他datanode,保证数据的拷贝数目(可以通过dfs.replication设置)。

具体负责数据接收的是这一行:

	blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
                                 mirrorAddr, null, targets.length);

	几个参数的含义:
		      DataOutputStream mirrOut, // output to next datanode
						// 下一个datanode的输出流
		      DataInputStream mirrIn,   // input from next datanode
						// 下一个datanode的输入流
		      DataOutputStream replyOut,  // output to previous datanode
						// 数据来源节点(可能是最初的client)的输出流
						// 用来发送反馈通知包
		      String mirrAddr, BlockTransferThrottler throttlerArg,
		      int numTargets) throws IOException {

在receiveBlock方法中,循环接收数据:
   
  /* 
       * Receive until packet length is zero.
       */
      while (receivePacket() > 0) {}

在receivePacket方法中:
	不断地从输入流中读取Packet数据:
		int payloadLen = readNextPacket();

	并将数据传输至下一个datanode节点:
		mirrorOut.write(buf.array(), buf.position(), buf.remaining());
		mirrorOut.flush();


	写入磁盘:
		out.write(pktBuf, dataOff, len);



四、如果一个文件超过1个block大小,怎么重定向到新的datanode的?在哪里分割的(file分割成blocks)?

答案是:在DFSOutputStream类的writeChunk方法里。

line 3043:
	if (bytesCurBlock == blockSize) {  // 问题是:它们能正好相等吗?万一bytesCurBlock > blockSize了怎么办?
            currentPacket.lastPacketInBlock = true;
            bytesCurBlock = 0;
            lastFlushOffset = -1;
	}
再往下几行:
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);

就是说,在new每个新的Packet之前,都会重新计算一下新的Packet的大小,
以保证新的Packet大小不会超过Block的剩余大小
如果block还有不到一个Packet的大小(比如还剩3kb的空间),则最后一个Packet的大小就是:
blockSize-bytesCurBlock,也就是3kb


line 2285:
              // get new block from namenode.
              if (blockStream == null) {
                LOG.debug("Allocating new block");
                nodes = nextBlockOutputStream(src); 
                this.setName("DataStreamer for file " + src +
                             " block " + block);
                response = new ResponseProcessor(nodes);
                response.start();
              }


在DataStreamer中,如果遇到one.lastPacketInBlock==true,则将blockStream设为null,之后会重新写入新的block。

**THE END**
2
0
分享到:
评论
2 楼 luxinzhu 2015-06-24  
       
1 楼 lenovoceo 2013-01-25  
写的好啊,非常受用啊

相关推荐

    hadoop2.7.3 hadoop.dll

    在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path追加%HADOOP_HOME%\bin,有可能出现如下错误: org.apache.hadoop.io.nativeio.NativeIO$Windows....

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf...

    Hadoop下载 hadoop-2.9.2.tar.gz

    Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo 的工程师 Doug Cutting 和 Mike Cafarella Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo...

    Hadoop下载 hadoop-3.3.3.tar.gz

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...

    Hadoop权威指南 中文版

    本书从hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍hado叩这一高性能处理海量数据集的理想工具。全书共14章,3个附录,涉及的主题包括:haddoop简介:mapreduce简介:hadoop分布式文件系统;hadoop的i...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop配置资源 ,hadoop-3.0.0,hadoop.dll,winutils

    调用保存文件的算子,需要配置Hadoop依赖 将文件夹中的 hadoop-3.0.0 解压到电脑任意位置 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’ winutils.exe,并放入Hadoop解压...

    hadoop-3.3.4 版本(最新版)

    Apache Hadoop (hadoop-3.3.4.tar.gz)项目为可靠、可扩展的分布式计算开发开源软件。官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程...

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop2.7.3 Winutils.exe hadoop.dll

    Hadoop集群pdf文档

    Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期)_SecureCRT使用 Hadoop_Hadoop集群(第5期)_Hadoop安装配置 Hadoop_Hadoop...

    hadoop的dll文件 hadoop.zip

    hadoop的dll文件 hadoop.zip

    hadoop_tutorial hadoop入门经典

    hadoop_tutorial hadoop入门经典 Hadoop 是一个能够对大量数据进行分布式处理的软件框架。Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。...

    Hadoop多版本 hadoop.dll和winutils.exe 下载

    支持如下版本的Hadoop hadoop-2.6.0 hadoop-2.6.3 hadoop-2.6.4 hadoop-2.7.1 hadoop-2.8.1 hadoop-2.8.3 hadoop-3.0.0

    hadoop2.6.0 hadoop.dll包括winutils.exe

    hadoop2.6.0 hadoop.dll包括winutils.exe

    hadoop-3.1.3安装包

    Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合...

    Hadoop大数据资料集锦

    Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦

    hadoop的hadoop.dll和winutils.exe

    hadoop hadoop的hadoop.dll和winutils.exe 解决方法, 把winutils.exe加入你的hadoop-x.x.x/bin下 Could not locate executable null\bin\winutils.exe in the Hadoop binaries

    Hadoop开发环境的插件hadoop-eclipse-plugin-2.10.1

    Hadoop Eclipse是Hadoop开发环境的插件,用户在创建Hadoop程序时,Eclipse插件会自动导入Hadoop编程接口的jar文件,这样用户就可以在Eclipse插件的图形界面中进行编码、调试和运行Hadop程序,也能通过Eclipse插件...

    windows64位hadoop2.7.7版本hadoop.dll

    windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容 windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容 windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容

    Hadoop集群程序设计与开发

    《Hadoop集群程序设计与开发(数据科学与大数据技术专业系列规划教材)》系统地介绍了基于Hadoop的大数据处理和系统开发相关技术,包括初识Hadoop、Hadoop基础知识、Hadoop开发环境配置与搭建、Hadoop分布式文件系统、...

Global site tag (gtag.js) - Google Analytics