,fen#1. hadoop介绍

1.1 hadoop介绍

  • 核心设计:MapReduce计算框架和HDFS分布式文件系统
  • 定位:基于磁盘的批量处理,例如日志分析、数据挖掘、机器学习

1.2 hadoop特性

  • 横向扩展好,可以组建大集群,计算能力很强
  • 分布式存储
  • 多副本存放数据,增加可用性
  • 顺序读性能较好,随机读效率不行
  • 基于本地计算,节点对自己计算机上的部分数据进行计算。(移动计算比移动数据更经济,因为节点的部分数据可以不占用IO带宽)
  • 网络带宽是瓶颈

1.3 为什么要用hadoop?

  1. 通过MapReduce计算框架,程序员通过实现Map和Reduce类可以很轻松的编写分布式并行程序
  2. 分布式存储、工作调度、负载均衡、容错处理、网络通信等都由MR和HDFS处理,程序员不用操心。

2. MapReduce原理

2.1 Map阶段

  1. Read: 读取数据源,将数据进行filter成一个个K/V
  2. Map: 在map函数中,处理解析的K/V,并产生新的K/V
  3. Collect: 输出结果,存到环形内缓冲区
  4. Spill: 内存区满,数据写到本地磁盘,并产生临时文件
  5. Combine: 合并临时文件,确保产生一个数据文件

2.2 Reduce阶段

  1. Shuffle: Copy阶段,Reduce Task到各个Map Task远程复制一份数据,若其大小超过一定阈值,则写磁盘;否则放到内容
  2. Merge: 合并内存和磁盘上的文件,防止内存占用过多或磁盘文件过多
  3. Sort: Map Task阶段进行局部排序,Reduce Task阶段进行一次归并排序
  4. Reduce: 将数据给reduce函数
  5. Writer: reduce函数将其计算结果写到HDFS

2.3 MapReduce的作业流程

注意点:

  1. 数据本地化:map任务不会随便分配给TaskTracker,只会分配给有对应数据块的TaskTracker
  2. 心跳信息: 任务完成的进度等元信息
  3. TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽
  4. 运行作业所需要的资源文件需要复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息
  5. 输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息

2.4 Map、Reduce任务中Shuffle和排序的过程

Map端

  1. 每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

  2. 在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combia操作,这样做的目的是让尽可能少的数据写入到磁盘。

  3. 当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。

  4. 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。

到这里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是“洗牌”,如果我们这样看:一个map产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,是不是一个对数据洗牌的过程呢?呵呵。

Reduce端

  1. Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。

  2. 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。

  3. 合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。

3. HDFS原理浅析

3.1 基础架构

  1. Client: 对用户提供系列操作工具API

  2. NameNode:包含资源分配算法和map的数据结构

  3. DataNode:管理好自己的磁盘, 上报数据给NameNode

3.2 读取文件

  1. Client向NameNode读取数据分布式信息
  2. Client找到第一个数据块离自己最近的DataNode
  3. 跟这个DataNode交互并获取数据
  4. 读完之后开始跟下一个数据块离自己最近的DataNode交互
  5. 读完之后close连接
  6. 如果读取过程中读取失败, 将会依次读取该数据块下一个副本, 失败的节点将被记录, 不再连接

远近的判断标准(NetworkTopology.sortByDistance):

  • 如果客户端和某个Datanode在同一台机器上, 优先
  • 如果客户端和某个Datanode在同一个rack上, 次优先

否则随机

3.3 写入文件

  1. 客户端通知NameNode创建目录
  2. 客户端开始写数据, 先写到本地, 然后定期分块
  3. 要写新块的时候再跟NameNode打交道, 获取到新块的目标地址
  4. 同一个数据块的不同副本是链式同步, 客户端只跟第一个副本打交道
  5. 只有所有副本都写入成功, 才开始下一个块的写操作
  6. 如果有一个写失败, 则:
  7. 失败的DataNode会加一个标记, 根据这个标记, 这份不完全的数据回头会被删除
  8. 不再往失败的DataNode上面写, 其他两个DataNode继续写
  9. 告诉NameNode这份数据副本数不足, NameNode回头会异步的补上
  10. 如果副本数少于某个配置(比如1个), 整个写入就算失败

3.4 关于namenode的单点故障

在hadoop 2.x 中使用HA技术已经解决了该问题

参考资料:

  1. http://www.cnblogs.com/smartloli/p/4875024.html?utm_source=tuicool&utm_medium=referral
  2. http://www.360doc.com/content/14/0107/20/15109633_343416433.shtml
  3. http://weixiaolu.iteye.com/blog/1474172
  4. http://www.cnblogs.com/xuanku/p/hdfs.html