瞿璐祎的博客

try something new


  • Home

  • Tags

  • Categories

  • Archives

  • Schedule

Spark之join操作

Posted on 2018-11-11 | In 分布式模型与编程

Spark之join操作.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class join {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("join").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> persons = jsc.textFile("spark/input3/person.txt");
JavaRDD<String> addresses = jsc.textFile("spark/input3/address.txt");


//得到 key: 邮编号,value:学号和名字
JavaPairRDD<String, String> personkv = persons.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {

String[] personsplit = str.split(" |\t"); // notice maybe \t
//System.out.println("length"+personsplit.length);
//处理缺省数据
if (personsplit.length == 3)
{
//System.out.println("i am not null");
String code=personsplit[2];
String value = personsplit[0] +" "+ personsplit[1]; //number + name
return new Tuple2<String, String>(code, value);
}
else {

//System.out.println("null null null ++++++++="+personsplit.length);
return new Tuple2<String,String>(null,null);
}
}
}); // to split the person and get the form <210000,1 Aaron>

//得到key: 邮编号 value:城市
JavaPairRDD<String, String> addresskv = addresses.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {
String[] addresssplit = str.split(" |\t");
String code=addresssplit[0];
String value = addresssplit[1]; //city
return new Tuple2<String, String>(code, value);
}
}); //to split the address and get the form <210000,Nanjing>

//进行join操作
JavaPairRDD<String, Tuple2<String, String>> joinres=personkv.join(addresskv);

//遍历输出
joinres.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Tuple2<String, String>> t) throws Exception {
System.out.println(t._2()._1+" "+t._1+ " "+t._2()._2);
}

});

joinres.saveAsTextFile("./spark/output3/");
}

}

输入:

person.txt:
    1 Aaron 210000
    .....
address.txt:
    210000 Nanjing
    .........

关系模型.md

Posted on 2018-11-11 | In 数据管理系统

关系模型

  • 关系模型的组成: 关系数据结构、关系操作集合、关系完整性约束
  • 关系模型具有单一的数据结构—— 关系。关系代数的逻辑结构是一张二维表,是建立在集合代数的基础上的

关系数据结构

  1. 域(demain):一组具有相同数据类型的值的集合(如整数)
  2. 笛卡尔积:穷尽所有的组合,即所有域的所有取值的一个组合(不能重复)
    a. 基数:一个域允许的不同取值个数
    b. 笛卡尔积可表示为一张二维表,表中的每行对应一个元组,表中的每列对应一个域

  3. 关系:笛卡尔积的子集
    a. 关系: R(D1,D2,…,Dn):R:关系名,n:关系的目或度(Degree)
    b. 元组:关系中的每个元素是关系中的元组,通常用t表示。
    c. 属性:表的每一列是一种属性
    d. 码

    i. 候选码
        1) 若关系中的某一属性组的值能唯一地表示一个元组,则称该属性组为候选码
        注:简单的情况:候选码只包含一个属性
    ii. 主属性:候选码的诸属性
    iii. 非主属性:不包含在任何候选码中的属性
    iv. 全码:关系模式的所有属性组是这个关系模式的候选码
    v. 主码:若一个关系有多个候选码,则选定其中一个为主码:可以一个组合作为一个主码,或者是定义一个id属性为主码
    

    e. 关系的三种类型:基本关系(实际存在的表)、查询表(查询结果对应的表)、视图表(基本表或其他视图表导出的表,是虚表,不对应实际存储的数据)
    f. 基本关系的性质

    ① 列是同质的(Homogeneous)
    ② 不同的列可出自同一个域
        其中的每一列称为一个属性
        不同的属性要给予不同的属性名
    ③ 列的顺序无所谓,,列的次序可以任意交换
    ④ 任意两个元组的候选码不能相同
    ⑤ 行的顺序无所谓,行的次序可以任意交换
        i. 最重要的:关系的每一个分量必须是一个不可分的数据项
    

关系代数

关系代数(Relational Algebra)

  • 是一种抽象的查询语言,它用对关系的运算来表示查询
  • 运算对象是关系、运算结果亦为关系
  • 两类运算符:
    i. 并、差、交、笛卡尔积(传统的集合运算符)
    ii. 选择、投影、连接、除(专门的关系运算符)
    
  • 传统的集合运算是从关系的“水平”方向即行的角度进行,专门的关系运算不仅涉及行而且涉及列

选择selection(限制)

  • 类型一个MongoDB中的find操作
  • 将一些不需要的行排除掉
    i.  σF(R) = {t |t属于R ∧ F (t )= '真'}
    ii. F的基本形式为:X1θY1,θ表示比较运算符,它可以是>,≥,<,≤,=或<>
    

投影projection

  • 从R 中选择出若干属性列组成新的关系 πA(R) = { t [A] | t 属于 R } , A:R 中的属性列
  • 将一些不需要的列排除,还需要将重复行的合并起来(不然就不符合关系的定义)

连接

  • (两张表连接成一张表)join (连接也称为θ连接)
  • MongoDB中没有连接
  • 从两个关系的笛卡尔积中选取属性间满足一定条件的元组
1
A和B:分别为R和S上度数相等且可比的属性组
  • 两种常用的连接:
    i. 等值连接 : 有重复的属性列(从关系R与S 的广义笛卡尔积中选取A、B属性值相等的那些元组) θ为“=”
    ii. 自然连接:在等值连接的基础上,并且在结果中把重复的属性列去除掉

  • 注:所有的数据都是关系,在上面可以做运算

  • 悬浮元组(Dangling tuple):
    • 两个关系R和S在做自然连接时,关系R中某些元组有可能在S中不存在公共属性上值相等的元组,从而造成R中这些元组在操作时被舍弃了,这些被舍弃的元组称为悬浮元组。
  • 外连接(Outer Join):
    • 如果把悬浮元组也保存在结果关系中,而在其他属性上填空值(Null),就叫做外连接
  • 左外连接(LEFT OUTER JOIN或LEFT JOIN):
    • 只保留左边关系R中的悬浮元组
  • 右外连接(RIGHT OUTER JOIN或RIGHT JOIN):
    • 只保留右边关系S中的悬浮元组

网状模型 、 文档模型的比较

  • 关系模型的特点:
    • 更强的表达能力(连接MongoDB做不了)
    • 比网状模型简洁
    • 在查询的时候还是有过程性的内容(关系代数)

关系演算(Relational Calculus)

  • 声明式查询语言
  • 类似FOL的表达方式
  • 与关系代数基本等价
  • 关系演算可以翻译成若干个关系代数,(有些内容的先后顺序不确定)
  • 关系代数+逻辑表达 ===》 声明式的
  • 是SQL的基础
  • 关系模型相当于网状模型,将更多细节藏在了里面,更倾向于声明式语言

表达能力范围比较(不涉及精简度)

  • Mongodb CRUD < 关系代数/演算 < FOL(一阶代词逻辑) < Turing Machine
  • 注: 关系代数不能有递归,一阶逻辑可以由递归

总结:

  • 注:App和DB独立,将更多的操作推给数据库,只告诉数据库需要什么,接口就会变得很简洁(表达能力强)

  • 初衷:以防两个app同时使用一个DB,引起冲突,因此把操作都放在DB中。

  • 现在:推崇模块化,将DB都放在每个模块中,为了更新一个模块时不会动另一个模块。

​
​
​
​
​
​
​
​
​

文档型数据库

Posted on 2018-11-11 | In 数据管理系统

简介

  • MongoDB:文档型数据库,按照文档的形式存储

    • Web App(用的较多)
  • 数据库管理系统(DBMS)

    • 需要把具体如何实现的如 增删改查 隐藏在系统软件中,对用户软件透明

基础概念:

  • 文档(数据模型) 【关系数据库的数据模型是表格】
    • 用<key/attribute,value>存放,json的格式
    • 一个文档里还可以包括子文档
    • 每个文档都有个_id,如果在一个文档中没有说_id是什么,数据库会自动分配一个,可以通过_id找到数据库的任何一个文档
    • 一个文档可以是做一个对象(json)
  • 文档集(collection)

    • 文档的集合(一个文档集可以视为一类对象)
  • 数据库(database)

    • 若干个文档集构成一个数据库
    • 部署在一个服务器上,【一般】一个数据库对应一个应用

文档的访问接口

  • 插入文档
    • db.foo.insert({“bar”:”baz”})
      • db:datebase foo:collection
    • db.foo.batchinsert() 批量插入
  • 查找文档
    • db.blog.find() [查blog文档集的所有文档]
    • Joe=db.people.findOne({“name”:”joe”,”age”:20}); [查people文档集特定属性的文档]
    • db.users.find({},{“username”:1,”email”:1}) [只返回users文档的username,email这两个属性、前面的空括号表明不限定如何查询条件,如果这里去掉后面两个属性就变成查询条件了]
    • db.users.find({“age”:{“$gte”:18,”$lte”:30}}) 【查年龄在18~30的用户文档】
    • db.raffle.find({“ticket_no”}:{“$in”} :{ [725,542,390]}})
  • 删除文档
    • db.foo.remove()
    • db.mailing.list.remove({“opt-out”:true})
  • 更新文档
    • db.users.update({“_id” : ObjectId(“28h32j223e92e”)},…{“$set” : {“favorite book” : “War and Peace”}}) 【$set指置换】
    • db.games.update({“game” : “pinball”, “user” : “joe”},…{“$inc” : {“score” : 50}})

文档模型的设计思想

  • 优点:每个元素都被视为对象(储存为json格式)

存储器件

  • 数据处理性能的宗旨

    • 提高数据访问的局部性:
    • 对磁盘/闪存而言
      • 减少I/O的次数;
      • 变随机访问为顺序访问。
    • 对内存而言
      • 增加Cache的命中率
  • 数据库的基本存储架构

    • 以页为单位存放数据。每一页为512bytes的整数倍,4KB到4MB。

索引:

  • 对于非聚集索引,有些查询甚至可以不访问数据页。
  • 聚集索引可以避免数据插入操作集中于表的最后一个数据页。

  • 当然,众所周知,虽然索引可以提高查询速度,但是它们也会导致数据库系统更新数据的性能下降,因为大部分数据更新需要同时更新索引。

  • 参考 https://www.cnblogs.com/ccsccs/articles/4243644.html

B-Tree

B-Tree的平衡性

  • 每个节点的大小固定(e.g. 4KB或8KB),因此最多只能容纳n个键和n+1个指针。

什么决定B-Tree的效率?

  • 树的高度决定查询需要I/O次数。
  • 对于同样规模的数据,n越大树的高度越低。
  • 为了提高B-Tree的效率,我们需要增加n,即增加每个节点容纳键和指针的数量。
    • 用简短的数据类型定义键的属性,e.g. smallint.
    • 对B-Tree进行压缩。

Mongodb数据库设计

Posted on 2018-11-11 | In 数据管理系统

概念

  • Mongodb是以面向对象存储的 , 做Obj<--->Doc之间的转换,一般用Java去实践
    • ORM工具:Obj Relation Mapping

数据库设计

  • 数据库设计
    ○ 什么样的数据存在里面
    ○ 数据怎么样存在里面(组织形式) & 应用如何去访问数据库
  • 数据库设计的基本步骤
    ○ 需求设计(软件的功能是什么): 了解用户需求,确定软件的基本功能
    ○ 概念结构设计(存什么): 确定数据库需要记录哪些对象
    ○ 逻辑结构设计设计(抽象怎么存):确定对象之间的关系以及文档结构。
    ○ 物理结构设计 (代码的实现):确定数据的存储方式、索引的使用、系统配置等等
    ○ 数据库实施:安装数据库管理系统、创建数据、调试、运行。
    ○ 数据库运行和维护:根据需求的扩展与变化,对以上结果进行调整和变更

MVC

  • MVC (model – view – controller)

对象

对象的确定

○ 软件功能涉及到的概念

  • 博客:用户、文章、评论、粉丝
  • 网上书店:图书、用户、购物车、订单、评论

○ 软件开发过程中出现的持久化需求

  • Java开发中遇到需要序列化的对象
  • MVC中Model的构建

对象之间的关系(Cardiality)

○ 1:1 —— 企业与CEO 、 配偶关系
○ 1:M —— 博客与评论、国家与城市
○ M:N —— 书与作者、用户与粉丝

拇指法则

  • 赞成嵌入,除非有一个令人信服的理由不这样做。
  • 需要访问一个对象本身是一个令人信服的理由不嵌入它。
    数组不应该无限制地增长。如果在“多”方面有超过几百个文档,不要嵌入它们;如果“多”方面有几千个文档,不要使用ObjectID引用数组。高基数数组是不嵌入的一个令人信服的理由。
  • 不要害怕应用程序级连接:如果正确索引并使用了投影说明符,那么应用程序级连接的开销几乎不比关系数据库中的服务器端连接高。
  • 在反规范化时考虑写/读比。一个通常会被读取但很少被更新的字段是反规范化的一个很好的候选:如果你对一个经常更新的字段进行反规范化,那么查找和更新所有实例的额外工作很可能会超过你从反规范化中获得的节省。
  • 与MongoDB一样,如何建模数据完全取决于特定应用程序的数据访问模式。您希望构造数据,以匹配应用程序查询和更新数据的方式。

Spark 实现kmeans算法

Posted on 2018-11-11 | In 分布式模型与编程

spark 实现K-means算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package kmeans;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;.
import java.util.Arrays;
import java.util.Iterator;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;


import scala.Tuple2;



public class kmeans{
static double[][] center = new double[4][2]; //这里有4个中心点,为2维
static int[] number = new int[4]; //记录属于当前中心点的数据的个数,方便做除法
static double[][] new_center = new double[4][2]; //计算出来的新中心点
public static void main(String[] args) {

// 从文件中读出中心点,并且放入center数组中
ArrayList<String> arrayList = new ArrayList<String>();
try {
File file = new File("/usr/local/hadoop-2.7.3/centers.txt");
InputStreamReader input = new InputStreamReader(new FileInputStream(file));
BufferedReader bf = new BufferedReader(input);
// 按行读取字符串
String str;
while ((str = bf.readLine()) != null) {
arrayList.add(str);
}
bf.close();
input.close();
} catch (IOException e) {
e.printStackTrace();
}
// 对ArrayList中存储的字符串进行处理
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 2; j++) {
String s = arrayList.get(i).split(",")[j];
center[i][j] = Double.parseDouble(s);
}
}


//System.out.println("center+++" + center[3][1]);
SparkConf conf = new SparkConf().setAppName("kmeans").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> datas = jsc.textFile("spark/input4/k-means.dat"); //从hdfs上读取data

while(true) {
for (int i = 0; i< 4;i++) //注意每次循环都需要将number[i]变为0
{
number[i]=0;
}
//将data分开,得到key: 属于某个中心点的序号(0/1/2/3),value: 与该中心点的距离
JavaPairRDD<Integer, Tuple2<Double, Double>> data = datas.mapToPair(new PairFunction<String, Integer,Tuple2<Double, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Integer,Tuple2<Double, Double>> call(String str) throws Exception {
final double[][] loc = center;
String[] datasplit = str.split(",");
double x = Double.parseDouble(datasplit[0]);
double y = Double.parseDouble(datasplit[1]);
double minDistance = 99999999;
int centerIndex = 0;
for(int i = 0;i < 4;i++){
double itsDistance = (x-loc[i][0])*(x-loc[i][0])+(y-loc[i][1])*(y-loc[i][1]);
if(itsDistance < minDistance){
minDistance = itsDistance;
centerIndex = i;
}
}
number[centerIndex]++; //得到属于4个中心点的个数

return new Tuple2<Integer,Tuple2<Double, Double>>(centerIndex, new Tuple2<Double,Double>(x,y));
// the center's number & data
}
});

//得到key: 属于某个中心点的序号, value:新中心点的坐标
JavaPairRDD<Integer, Iterable<Tuple2<Double, Double>>> sum_center = data.groupByKey();
//System.out.println(sum_center.collect());

JavaPairRDD<Integer,Tuple2<Double, Double>> Ncenter = sum_center.mapToPair(new PairFunction<Tuple2<Integer, Iterable<Tuple2<Double, Double>>>,Integer,Tuple2<Double, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Integer, Tuple2<Double, Double>> call(Tuple2<Integer, Iterable<Tuple2<Double, Double>>> a)throws Exception {
//System.out.println("i am here**********new center******");
int sum_x = 0;
int sum_y = 0;
Iterable<Tuple2<Double, Double>> it = a._2;

for(Tuple2<Double, Double> i : it) {
sum_x += i._1;
sum_y +=i._2;
}

double average_x = sum_x / number[a._1];
double average_y = sum_y/number[a._1];
//System.out.println("**********new center******"+a._1+" "+average_x+","+average_y);
return new Tuple2<Integer,Tuple2<Double,Double>>(a._1,new Tuple2<Double,Double>(average_x,average_y));
}
});


//将中心点输出
Ncenter.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Double,Double>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer,Tuple2<Double,Double>> t) throws Exception {
new_center[t._1][0] = t._2()._1;
new_center[t._1][1] = t._2()._2;
System.out.println("the new center: "+ t._1+" "+t._2()._1+" , "+t._2()._2);
}

});

//判断新的中心点和原来的中心点是否一样,一样的话退出循环得到结果,不一样的话继续循环(这里可以设置一个迭代次数)
double distance = 0;
for(int i=0;i<4;i++) {
distance += (center[i][0]-new_center[i][0])*(center[i][0]-new_center[i][0]) + (center[i][1]-new_center[i][1])*(center[i][1]-new_center[i][1]);
}

if(distance == 0.0) {
//finished
for(int j = 0;j<4;j++) {
System.out.println("the final center: "+" "+center[j][0]+" , "+center[j][1]);
}
break;
}
else {
for(int i = 0;i<4;i++) {
center[i][0] = new_center[i][0];
center[i][1] = new_center[i][1];
new_center[i][0] = 0;
new_center[i][1] = 0;
System.out.println("the new center: "+" "+center[i][0]+" , "+center[i][1]);
}
}
}
}
}

输入:

1. centers.txt :
    96,826
    606,776    
    474,866
    400,768
  1. data.dat:
    存放所有点的坐标存放所有点的坐标。

Spark 编程

Posted on 2018-11-07 | In 分布式模型与编程

Spark-处理系统

Posted on 2018-11-06 | In 分布式模型与编程

Spark vs. MapReduce

Hadoop MapReduce局限性:
- 表达能力有限
    ○ 仅map和reduce函数,无法直接用join等操作
- 磁盘I/O开销大(单个job)
    ○ 输入、输出及shuffle中间结果都需要读写磁盘
- 延迟高(多个job)
    ○ 有依赖关系:job之间的衔接涉及IO开销(迭代计算)
    ○ 无依赖关系:在前一个job执行完成之前,其他job依然无法开始


Spark的改进
- 表达能力有限
    ○ 增加join等更多复杂的函数,可以串联为DAG
- 磁盘IO开销大(单个job)
    ○ 非shuffle阶段避免中间结果写磁盘
- 延迟高(多个job作为一整个job)
    ○ 将原来的多个job作为一个job的多个阶段
        § 有依赖关系:各个阶段之间的衔接尽量写内存
        § 无依赖关系:多个阶段可以同时执行

相比MapReduce,Spark具有如下优点:
- Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活
- Spark提供了内存计算,可将迭代过程中的结果放到内存中,对于迭代运算效率更高
- Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制

RDD 抽象

- Spark的核心是建立在统一的抽象RDD上的,使得Spark的各个组件可以无缝地进行集成,在同一个应用程序中完成大数据计算任务。
- Resilient Distributed Dataset(弹性分布式数据集)
    - Resilient:具有可恢复的容错特性
    - Distributed:每个RDD可分成多个分区,一个RDD的不同分区可以存到集群中不同的节点上
    - Dataset:每个分区就是一个数据集片段

- RDD特性
    - RDD只读,不能直接修改
        § 只能基于稳定的物理存储中的数据集创建RDD通过在其他RDD上执行确定的转换操作(如map、join和group by)而得到新的RDD

    - RDD支持运算操作
        § 转换Transformation:描述RDD的转换逻辑
        § 动作Action:标志转换结束,触发DAG生成
            □ 惰性求值:只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作

Sample Pic

    - RDD Lineage,即DAG拓扑结构
        § RDD读入外部数据源进行创建
        § RDD经过一系列的转换(Transformation)操作,每一次都会产生不同的RDD,供给下一个转换操作使用
        § 最后一个RDD经过“动作”操作进行转换,并输出到外部数据源

- RDD之间的依赖关系
    - 窄依赖:
        § 表现为一个父RDD的分区对应于一个子RDD的分区,或者多个父RDD的分区对应于一个子RDD的分区
        § 典型的操作:map,filter,union

    - 宽依赖:
        § 表现为存在一个父RDD的一个分区对应一个子RDD的多个分区
        § 典型的操作:groupByKey,sortByKey
    - Join操作
        § 窄依赖: 对输入进行协同操作
            □ 指多个父RDD的某一分区的所有“键”落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区落在子RDD的两个分区的情况(如 a)
        § 宽依赖:对输入做非协同划分 (如b)

Sample Pic

- 划分阶段
    - spark分析各个RDD的偏序关系生成DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage
    - 具体划分方法:
        § 在DAG中进行反向解析,遇到宽依赖就断开
        § 遇到窄依赖就把当前的RDD加入到Stage中
        § 将窄依赖尽量划分在同一个Stage中,可以实现流水线计算 pipeline

Sample Pic

- Stage类型
    § ResultStage
        □ 输入/输出
            ® 其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出
            ® 输出直接产生结果或存储
        □ 特点:
            ® 在一个Job里必定有该类型Stage
            ® 最终的Stage

体系结构:

-  架构设计
    § Master :管理整个系统
        □ 集群资源管理器(Cluster Manager)
        □ 资源管理器可以自带或Mesos或YARN
    § Worker:运行作业的工作节点
        □ 负责任务执行的进程(Executor)
        □ 负责任务执行的线程(Task)
    § Application:用户编写的Spark应用程序
    § Job:一个Job包含多个RDD及作用于相应RDD上的各种操作
    § Stage:一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet
        □ Job的基本调度单位
        □ 代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集
    § Task:运行在Executor上的工作单元
- 作业与任务

Sample Pic

工作流程

Sample Pic
Sample Pic

- Spark Executor
    - 与MapReduce相比,Spark所采用的Executor有两个优点:
        § 利用多线程来执行具体的任务,减少任务的启动开销
        § Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,有效减少IO开销

Shuffle vs. Pipeline

- Stage之间:shuffle
- Stage内部:流水线pipeline

Sample Pic
Sample Pic
Sample Pic

容错机制

- Master故障
- Worker故障
    ○ Lineage机制
        § 窄依赖(narrow dependency)
            □ 执行某个partition时,检查父亲RDD对应的partition是否存在
                ® 存在,即可执行当前RDD对应的操作
                ® 不存在,则重构父亲RDD对应的partition
        § 宽依赖(wide dependency)
            □ 执行某个partition时,检查父亲RDD对应的partition是否存在
                ® 存在,即可执行当前RDD对应的操作
                ® 不存在,则重构整个父亲RDD
    ○ RDD存储机制
        § 血缘关系,重算过程在不同节点之间并行,只记录粗粒度的操作
            □ RDD提供的持久化(缓存)接口
                ® persist():对一个RDD标记为持久化
                ® 接受StorageLevel类型参数,可配置各种级别
                ® 持久化后的RDD将会被保留在计算节点的中被后面的行动操作重复使用
            □ cache()
                ® 相当于persist(MEMORY_ONLY)
            □ unpersist()
                ® 手动地把持久化的RDD从缓存中移除

Sample Pic
Sample Pic

○ 检查点机制
    § 前述机制的不足之处
        □ Lineage可能非常长
        □ RDD存储机制主要面向本地磁盘的存储
    § 检查点机制将RDD写入可靠的外部分布式文件系统,例如HDFS
        □ 在实现层面,写检查点的过程是一个独立job,作为后台作业运行

MapReduce处理系统

Posted on 2018-10-28 | In 分布式模型与编程

简介

  • MPI (Message Passing Interface)编程简介:消息传递接口,包括协议和语义说明MPI (Message Passing Interface)编程简介:消息传递接口,包括协议和语义说明
  • 使用MPI这样非常成熟的并行计算框架,为什么还需要MapReduce?
    ![Sample Pic][mapreduce1]
  • Hadoop简介

    • hadoop为用户提供了系统底层细节透明的分布式基础架构
    • Hadoop的核心是分布式文件系统HDFS 和 MapReduce
      ![Sample Pic][mapreduce2]
  • MapReduce体系结构

    • 集群网络拓扑
      • 机架间通信
      • 机架内通信
        ![Sample Pic][mapreduce3]
  • MapReduce的体系结构
    ![Sample Pic][mapreduce4]

  • Client:提交作业

    • 提交作业:用户编写的MapReduce程序通过Client提交到JobTracker端
      • 作业监控:用户可通过Client提供一些接口查看作业的运行状态
  • JobTracker:作业管理
    • 资源管理:监控TaskTracker与Job的状态,一旦发现失败,就将Tasj转移到其它节点
      • 作业调度: 将Job拆分成Task, JobTracker会跟踪任务的执行进度、资源使用量等消息,并将这些消息告知(TaskScheduler),而任务调度器会在资源出现空闲时,选择合适的任务去执行。
  • TaskTracker:任务管理
    • 执行操作:接收JobTracker发送过来的命令并执行(如启动新Task、杀死Task等)
    • 划分资源:使用”slot”等量划分本节点上的资源量(CPU、内存等),一个Task获取到一个slot后才有机会运行
      • Map slot -> Map Task
      • Reduce slot -> Reduce Task
        • 汇报信息:通过“心跳”将本节点上资源使用情况和任务运行进度汇报给JobTracker。
  • Task:任务执行
    • TaskScheduler将空闲slot分配给Task,由TaskTracker启动Task进程
      • 执行任务:Jar包发送到TaskTracker,利用反射和代理机制动态加载代码
      • 是个进程

MapReduce工作流程

  • 注意点:
    • 不同的Map任务之间不会进行通信
      • 不同的Reduce任务之间也不会发生任何信息交换
      • 用户不能显式地从一台机器向另一台机器发送消息
      • 所有的数据交换都是通过MapReduce框架自身去实现的(shuffle)
    • Map-Shuffle-Reduce
      • 一个很多的文件进行mapreduce的时候,可能将该文件分成多个split放入map task中进行执行,之后很多个map任务通过shuffle进行数据交换等,然后再reduce
      • 注:单个map还没结束,它的数据不能被reduce访问。所有的map还没结束,reduce就开始执行了,但是合并一些已收到的数据。 shuffle还没结束Reduce可以先开始,但是shuffle结束后reduce才可以结束
      • 从节点角度看MapReduce物理流程

![Sample Pic][mapreduce5]

  • InputFormat
    • Split:逻辑概念,包含一些元数据信息,如数据起始位置、数据长度、、数据所在节点等。划分方法由用户决定。
    • RR:因为Split是逻辑切分而非物理切分,所以还需要通过RecordReader(RR)根据Split中的信息来处理Split中的具体记录,加载数据并转换为合适Map任务读取的键值对,输入给Map任务。
    • InputFormat:定义了怎样与物理存储(HDFS block) 之间的映射,理想的分区大小是一个HDFS块,会做Map前的预处理,比如验证输入的格式是否符合输入定义。
  • Map

    • Map任务的数量:

    • Hadoop 为每个split创建一个Map任务。由split的多少决定的map任务的数目。

  • Shuffle
    • Map端的Shuffle过程
      • 输入数据和执行map任务
      • 写入缓存
        • 每个map任务分配一个缓存(map结束之后,所有的数据都溢写到磁盘(分区、排序、合并))
          • Map的输出结果首先被写入缓存,当缓存满时(有个溢写比例),就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。先写入缓存,可以大大减少对磁盘I/O的影响。在写入缓存前,key与value值都会被序列化成字节数组。
      • 溢写:设置溢写比例
        • 溢写由另外一个单独的后台线程来完成,通常会设置一个溢写比例,如0.8
        • 分区:哈希函数
          • Hash(key) mod R(其中R表示Reduce任务数量)
          • 不同的分区放到不同的reduce中,为shuffle做准备(因为不同数据放到不同的reduce)
        • 排序:局部有序:按照key值排序 (先分好区之后,在分区中做排序)
        • 合并:根据用户Combine函数计算,从而减少需要溢写到磁盘的数据量。
          • 因为Combiner的输出是Reduce任务的输入,Combiner绝不能改变Reduce任务最终的计算结果,一般而言,累加、最大值等场景可以使用合并操作。
      • 经过分区、排序、合并之后,这些缓存中的键值对就可以被写入磁盘,并清空缓存
      • 每次溢写操作都会在磁盘中生成一个新的溢写文件。在Map任务全部结束之前,系统会对所有溢写文件中的数据进行归并(merge),生成一个大的溢写文件。
      • 文件归并
        • 归并:将key相同的记录拼接在一起,归并得到的文件,放在本地磁盘
        • JobTracker会一直Map任务的执行,并通知Reduce任务来领取数据
          注: 合并(Combine)和归并(Merge)的区别:
          两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
注:JobTracker会一直检测Map任务的执行,当监测到一个Map任务完成后,就会立即通知相关的Reduce任务来“领取”数据,然后开始Reduce端的shuffle过程。
  • Reduce端的shuffle过程
    ![Sample Pic][mapreduce6]

  • 领取数据

    • 每个Reduce任务会不断地通过RPC向JobTracker询问Map任务是否已经完成。Map任务完成之后,JobTracker通知Reduce领取数据
      • 归并数据
      • reduce领取数据先放入缓存,如果缓存被沾满,就会被溢写到磁盘中来自不同map机器,归并写入磁盘
        • 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
          • 数据输入给Reduce任务
        • 执行Reduce函数
      • Reduce/OutputFormat
        • reduce任务的数量
          • 程序指定
            • 最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
              • 通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
  • Hadoop序列化:
    • 指把结构化对象转化为字节流以便在网络上传输或在磁盘上永久存储的过程
    • 序列化格式特点:紧凑、快速、可扩展、互操作(支持多语言的交互)
      • Hadoop的序列化格式:Writable
      • 数据类型
        ![Sample Pic][mapreduce7]
  • MapReduce与HDFS关系
    • 一般MapReduce和HDFS在同一台机器上,即将计算节点和存储节点放在一起运行,从而减少节点间的数据移动开销
      ![Sample Pic][mapreduce8]
    • 主要是实现Map 和 Reduce函数
    • Hadoop框架是用Java实现,但是,MapReduce应用程序则不一定要用Java来写
    • 理念:计算向数据靠拢

容错机制

  • MapReduce 运行失败

    • 机器故障:

      • Task失败:如JVM(java的虚拟机)内存不够退出
        • 内存设置大一点
        • MapReduce容错和HDFS容错是两回事
      • Map Task失败
        • 重新执行Map任务
        • 去HDFS重新读入数据
      • Reduce Task失败
        • 重新执行Reduce任务
        • 去刚刚map结束后保存的磁盘中读取
      • TaskTracker失败
        • JobTracker不会接收到心跳,于是会安排其他TaskTracker重新运行失败TaskTracker的热门无
      • JobTracker失败

        • 最严重的失败,Hadoop没有处理JobTracker失败的机制,是个单点故障
        • 所有任务需要重新运行
          ![Sample Pic][mapreduce9]

        Combine()是局部函数,在局部的map结束之后对它进行Combine,是在shuffle前做的!

    • Hadoop的应用现状
      ![Sample Pic][mapreduce10]

      ​

      书上知识点

      1. 在一个集群中,只要有可能,MapReduce框架就会将Map程序就近地在HDFS数据所在的节点运行,即将计算节点和存储节点放在一起运行,从而减少了节点间的数据移动开销。
        2.Reduce函数的任务就是将输入的一系列具有相同键的键值对以某种方式组合起来,输出处理后的键值对,输出结果会合并成一个文件
      2. Map任务的输入文件、Reduce任务的处理结果都是保存在HDFS中的,而Map任务处理得到的中间结果则保存在本地存储中(如磁盘)。
      3. 只有当Map处理全部结束后,Reduce过程才能开始;只有Map需要考虑数据局部性,实现“计算向数据靠拢”,而Reduce则无需考虑数据局部性

​

123
luyiqu

luyiqu

blog

28 posts
13 categories
27 tags
RSS
© 2020 luyiqu
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4
访客数 总访问量 次
0%