简介
- MPI (Message Passing Interface)编程简介:消息传递接口,包括协议和语义说明MPI (Message Passing Interface)编程简介:消息传递接口,包括协议和语义说明
- 使用MPI这样非常成熟的并行计算框架,为什么还需要MapReduce?
![Sample Pic][mapreduce1] Hadoop简介
- hadoop为用户提供了系统底层细节透明的分布式基础架构
- Hadoop的核心是分布式文件系统HDFS 和 MapReduce
![Sample Pic][mapreduce2]
MapReduce体系结构
- 集群网络拓扑
- 机架间通信
- 机架内通信
![Sample Pic][mapreduce3]
- 集群网络拓扑
MapReduce的体系结构
![Sample Pic][mapreduce4]Client:提交作业
- 提交作业:用户编写的MapReduce程序通过Client提交到JobTracker端
- 作业监控:用户可通过Client提供一些接口查看作业的运行状态
- 提交作业:用户编写的MapReduce程序通过Client提交到JobTracker端
- JobTracker:作业管理
- 资源管理:监控TaskTracker与Job的状态,一旦发现失败,就将Tasj转移到其它节点
- 作业调度: 将Job拆分成Task, JobTracker会跟踪任务的执行进度、资源使用量等消息,并将这些消息告知(TaskScheduler),而任务调度器会在资源出现空闲时,选择合适的任务去执行。
- 资源管理:监控TaskTracker与Job的状态,一旦发现失败,就将Tasj转移到其它节点
- TaskTracker:任务管理
- 执行操作:接收JobTracker发送过来的命令并执行(如启动新Task、杀死Task等)
- 划分资源:使用”slot”等量划分本节点上的资源量(CPU、内存等),一个Task获取到一个slot后才有机会运行
- Map slot -> Map Task
- Reduce slot -> Reduce Task
- 汇报信息:通过“心跳”将本节点上资源使用情况和任务运行进度汇报给JobTracker。
- Task:任务执行
- TaskScheduler将空闲slot分配给Task,由TaskTracker启动Task进程
- 执行任务:Jar包发送到TaskTracker,利用反射和代理机制动态加载代码
- 是个进程
- TaskScheduler将空闲slot分配给Task,由TaskTracker启动Task进程
MapReduce工作流程
- 注意点:
- 不同的Map任务之间不会进行通信
- 不同的Reduce任务之间也不会发生任何信息交换
- 用户不能显式地从一台机器向另一台机器发送消息
- 所有的数据交换都是通过MapReduce框架自身去实现的(shuffle)
- Map-Shuffle-Reduce
- 一个很多的文件进行mapreduce的时候,可能将该文件分成多个split放入map task中进行执行,之后很多个map任务通过shuffle进行数据交换等,然后再reduce
- 注:单个map还没结束,它的数据不能被reduce访问。所有的map还没结束,reduce就开始执行了,但是合并一些已收到的数据。 shuffle还没结束Reduce可以先开始,但是shuffle结束后reduce才可以结束
- 从节点角度看MapReduce物理流程
- 不同的Map任务之间不会进行通信
![Sample Pic][mapreduce5]
- InputFormat
- Split:逻辑概念,包含一些元数据信息,如数据起始位置、数据长度、、数据所在节点等。划分方法由用户决定。
- RR:因为Split是逻辑切分而非物理切分,所以还需要通过RecordReader(RR)根据Split中的信息来处理Split中的具体记录,加载数据并转换为合适Map任务读取的键值对,输入给Map任务。
- InputFormat:定义了怎样与物理存储(HDFS block) 之间的映射,理想的分区大小是一个HDFS块,会做Map前的预处理,比如验证输入的格式是否符合输入定义。
Map
Map任务的数量:
Hadoop 为每个split创建一个Map任务。由split的多少决定的map任务的数目。
- Shuffle
- Map端的Shuffle过程
- 输入数据和执行map任务
- 写入缓存
- 每个map任务分配一个缓存(map结束之后,所有的数据都溢写到磁盘(分区、排序、合并))
- Map的输出结果首先被写入缓存,当缓存满时(有个溢写比例),就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。先写入缓存,可以大大减少对磁盘I/O的影响。在写入缓存前,key与value值都会被序列化成字节数组。
- 每个map任务分配一个缓存(map结束之后,所有的数据都溢写到磁盘(分区、排序、合并))
- 溢写:设置溢写比例
- 溢写由另外一个单独的后台线程来完成,通常会设置一个溢写比例,如0.8
- 分区:哈希函数
- Hash(key) mod R(其中R表示Reduce任务数量)
- 不同的分区放到不同的reduce中,为shuffle做准备(因为不同数据放到不同的reduce)
- 排序:局部有序:按照key值排序 (先分好区之后,在分区中做排序)
- 合并:根据用户Combine函数计算,从而减少需要溢写到磁盘的数据量。
- 因为Combiner的输出是Reduce任务的输入,Combiner绝不能改变Reduce任务最终的计算结果,一般而言,累加、最大值等场景可以使用合并操作。
- 经过分区、排序、合并之后,这些缓存中的键值对就可以被写入磁盘,并清空缓存
- 每次溢写操作都会在磁盘中生成一个新的溢写文件。在Map任务全部结束之前,系统会对所有溢写文件中的数据进行归并(merge),生成一个大的溢写文件。
- 文件归并
- 归并:将key相同的记录拼接在一起,归并得到的文件,放在本地磁盘
- JobTracker会一直Map任务的执行,并通知Reduce任务来领取数据
注: 合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
- Map端的Shuffle过程
注:JobTracker会一直检测Map任务的执行,当监测到一个Map任务完成后,就会立即通知相关的Reduce任务来“领取”数据,然后开始Reduce端的shuffle过程。
Reduce端的shuffle过程
![Sample Pic][mapreduce6]领取数据
- 每个Reduce任务会不断地通过RPC向JobTracker询问Map任务是否已经完成。Map任务完成之后,JobTracker通知Reduce领取数据
- 归并数据
- reduce领取数据先放入缓存,如果缓存被沾满,就会被溢写到磁盘中来自不同map机器,归并写入磁盘
- 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
- 数据输入给Reduce任务
- 执行Reduce函数
- 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
- Reduce/OutputFormat
- reduce任务的数量
- 程序指定
- 最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
- 通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
- 最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
- 程序指定
- reduce任务的数量
- 每个Reduce任务会不断地通过RPC向JobTracker询问Map任务是否已经完成。Map任务完成之后,JobTracker通知Reduce领取数据
- Hadoop序列化:
- 指把结构化对象转化为字节流以便在网络上传输或在磁盘上永久存储的过程
- 序列化格式特点:紧凑、快速、可扩展、互操作(支持多语言的交互)
- Hadoop的序列化格式:Writable
- 数据类型
![Sample Pic][mapreduce7]
- MapReduce与HDFS关系
- 一般MapReduce和HDFS在同一台机器上,即将计算节点和存储节点放在一起运行,从而减少节点间的数据移动开销
![Sample Pic][mapreduce8] - 主要是实现Map 和 Reduce函数
- Hadoop框架是用Java实现,但是,MapReduce应用程序则不一定要用Java来写
- 理念:计算向数据靠拢
- 一般MapReduce和HDFS在同一台机器上,即将计算节点和存储节点放在一起运行,从而减少节点间的数据移动开销
容错机制
MapReduce 运行失败
机器故障:
- Task失败:如JVM(java的虚拟机)内存不够退出
- 内存设置大一点
- MapReduce容错和HDFS容错是两回事
- Map Task失败
- 重新执行Map任务
- 去HDFS重新读入数据
- Reduce Task失败
- 重新执行Reduce任务
- 去刚刚map结束后保存的磁盘中读取
- TaskTracker失败
- JobTracker不会接收到心跳,于是会安排其他TaskTracker重新运行失败TaskTracker的热门无
JobTracker失败
- 最严重的失败,Hadoop没有处理JobTracker失败的机制,是个单点故障
- 所有任务需要重新运行
![Sample Pic][mapreduce9]
Combine()是局部函数,在局部的map结束之后对它进行Combine,是在shuffle前做的!
- Task失败:如JVM(java的虚拟机)内存不够退出
Hadoop的应用现状
![Sample Pic][mapreduce10]
书上知识点
- 在一个集群中,只要有可能,MapReduce框架就会将Map程序就近地在HDFS数据所在的节点运行,即将计算节点和存储节点放在一起运行,从而减少了节点间的数据移动开销。
2.Reduce函数的任务就是将输入的一系列具有相同键的键值对以某种方式组合起来,输出处理后的键值对,输出结果会合并成一个文件 - Map任务的输入文件、Reduce任务的处理结果都是保存在HDFS中的,而Map任务处理得到的中间结果则保存在本地存储中(如磁盘)。
- 只有当Map处理全部结束后,Reduce过程才能开始;只有Map需要考虑数据局部性,实现“计算向数据靠拢”,而Reduce则无需考虑数据局部性
- 在一个集群中,只要有可能,MapReduce框架就会将Map程序就近地在HDFS数据所在的节点运行,即将计算节点和存储节点放在一起运行,从而减少了节点间的数据移动开销。