Spark之join操作.md
1 | import java.util.Arrays; |
输入:
person.txt:
1 Aaron 210000
.....
address.txt:
210000 Nanjing
.........
try something new
1 | import java.util.Arrays; |
person.txt:
1 Aaron 210000
.....
address.txt:
210000 Nanjing
.........
笛卡尔积:穷尽所有的组合,即所有域的所有取值的一个组合(不能重复)
a. 基数:一个域允许的不同取值个数
b. 笛卡尔积可表示为一张二维表,表中的每行对应一个元组,表中的每列对应一个域
关系:笛卡尔积的子集
a. 关系: R(D1,D2,…,Dn):R:关系名,n:关系的目或度(Degree)
b. 元组:关系中的每个元素是关系中的元组,通常用t表示。
c. 属性:表的每一列是一种属性
d. 码
i. 候选码
1) 若关系中的某一属性组的值能唯一地表示一个元组,则称该属性组为候选码
注:简单的情况:候选码只包含一个属性
ii. 主属性:候选码的诸属性
iii. 非主属性:不包含在任何候选码中的属性
iv. 全码:关系模式的所有属性组是这个关系模式的候选码
v. 主码:若一个关系有多个候选码,则选定其中一个为主码:可以一个组合作为一个主码,或者是定义一个id属性为主码
e. 关系的三种类型:基本关系(实际存在的表)、查询表(查询结果对应的表)、视图表(基本表或其他视图表导出的表,是虚表,不对应实际存储的数据)
f. 基本关系的性质
① 列是同质的(Homogeneous)
② 不同的列可出自同一个域
其中的每一列称为一个属性
不同的属性要给予不同的属性名
③ 列的顺序无所谓,,列的次序可以任意交换
④ 任意两个元组的候选码不能相同
⑤ 行的顺序无所谓,行的次序可以任意交换
i. 最重要的:关系的每一个分量必须是一个不可分的数据项
i. 并、差、交、笛卡尔积(传统的集合运算符)
ii. 选择、投影、连接、除(专门的关系运算符)
i. σF(R) = {t |t属于R ∧ F (t )= '真'}
ii. F的基本形式为:X1θY1,θ表示比较运算符,它可以是>,≥,<,≤,=或<>
1 | A和B:分别为R和S上度数相等且可比的属性组 |
两种常用的连接:
i. 等值连接 : 有重复的属性列(从关系R与S 的广义笛卡尔积中选取A、B属性值相等的那些元组) θ为“=”
ii. 自然连接:在等值连接的基础上,并且在结果中把重复的属性列去除掉
注:所有的数据都是关系,在上面可以做运算
注:App和DB独立,将更多的操作推给数据库,只告诉数据库需要什么,接口就会变得很简洁(表达能力强)
初衷:以防两个app同时使用一个DB,引起冲突,因此把操作都放在DB中。
现在:推崇模块化,将DB都放在每个模块中,为了更新一个模块时不会动另一个模块。
MongoDB:文档型数据库,按照文档的形式存储
数据库管理系统(DBMS)
文档集(collection)
数据库(database)
数据处理性能的宗旨
数据库的基本存储架构
聚集索引可以避免数据插入操作集中于表的最后一个数据页。
当然,众所周知,虽然索引可以提高查询速度,但是它们也会导致数据库系统更新数据的性能下降,因为大部分数据更新需要同时更新索引。
○ 软件功能涉及到的概念
○ 软件开发过程中出现的持久化需求
○ 1:1 —— 企业与CEO 、 配偶关系
○ 1:M —— 博客与评论、国家与城市
○ M:N —— 书与作者、用户与粉丝
1 | package kmeans; |
1. centers.txt :
96,826
606,776
474,866
400,768
Hadoop MapReduce局限性:
- 表达能力有限
○ 仅map和reduce函数,无法直接用join等操作
- 磁盘I/O开销大(单个job)
○ 输入、输出及shuffle中间结果都需要读写磁盘
- 延迟高(多个job)
○ 有依赖关系:job之间的衔接涉及IO开销(迭代计算)
○ 无依赖关系:在前一个job执行完成之前,其他job依然无法开始
Spark的改进
- 表达能力有限
○ 增加join等更多复杂的函数,可以串联为DAG
- 磁盘IO开销大(单个job)
○ 非shuffle阶段避免中间结果写磁盘
- 延迟高(多个job作为一整个job)
○ 将原来的多个job作为一个job的多个阶段
§ 有依赖关系:各个阶段之间的衔接尽量写内存
§ 无依赖关系:多个阶段可以同时执行
相比MapReduce,Spark具有如下优点:
- Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活
- Spark提供了内存计算,可将迭代过程中的结果放到内存中,对于迭代运算效率更高
- Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制
- Spark的核心是建立在统一的抽象RDD上的,使得Spark的各个组件可以无缝地进行集成,在同一个应用程序中完成大数据计算任务。
- Resilient Distributed Dataset(弹性分布式数据集)
- Resilient:具有可恢复的容错特性
- Distributed:每个RDD可分成多个分区,一个RDD的不同分区可以存到集群中不同的节点上
- Dataset:每个分区就是一个数据集片段
- RDD特性
- RDD只读,不能直接修改
§ 只能基于稳定的物理存储中的数据集创建RDD通过在其他RDD上执行确定的转换操作(如map、join和group by)而得到新的RDD
- RDD支持运算操作
§ 转换Transformation:描述RDD的转换逻辑
§ 动作Action:标志转换结束,触发DAG生成
□ 惰性求值:只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作
- RDD Lineage,即DAG拓扑结构
§ RDD读入外部数据源进行创建
§ RDD经过一系列的转换(Transformation)操作,每一次都会产生不同的RDD,供给下一个转换操作使用
§ 最后一个RDD经过“动作”操作进行转换,并输出到外部数据源
- RDD之间的依赖关系
- 窄依赖:
§ 表现为一个父RDD的分区对应于一个子RDD的分区,或者多个父RDD的分区对应于一个子RDD的分区
§ 典型的操作:map,filter,union
- 宽依赖:
§ 表现为存在一个父RDD的一个分区对应一个子RDD的多个分区
§ 典型的操作:groupByKey,sortByKey
- Join操作
§ 窄依赖: 对输入进行协同操作
□ 指多个父RDD的某一分区的所有“键”落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区落在子RDD的两个分区的情况(如 a)
§ 宽依赖:对输入做非协同划分 (如b)
- 划分阶段
- spark分析各个RDD的偏序关系生成DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage
- 具体划分方法:
§ 在DAG中进行反向解析,遇到宽依赖就断开
§ 遇到窄依赖就把当前的RDD加入到Stage中
§ 将窄依赖尽量划分在同一个Stage中,可以实现流水线计算 pipeline
- Stage类型
§ ResultStage
□ 输入/输出
® 其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出
® 输出直接产生结果或存储
□ 特点:
® 在一个Job里必定有该类型Stage
® 最终的Stage
- 架构设计
§ Master :管理整个系统
□ 集群资源管理器(Cluster Manager)
□ 资源管理器可以自带或Mesos或YARN
§ Worker:运行作业的工作节点
□ 负责任务执行的进程(Executor)
□ 负责任务执行的线程(Task)
§ Application:用户编写的Spark应用程序
§ Job:一个Job包含多个RDD及作用于相应RDD上的各种操作
§ Stage:一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet
□ Job的基本调度单位
□ 代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集
§ Task:运行在Executor上的工作单元
- 作业与任务
- Spark Executor
- 与MapReduce相比,Spark所采用的Executor有两个优点:
§ 利用多线程来执行具体的任务,减少任务的启动开销
§ Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,有效减少IO开销
- Stage之间:shuffle
- Stage内部:流水线pipeline
- Master故障
- Worker故障
○ Lineage机制
§ 窄依赖(narrow dependency)
□ 执行某个partition时,检查父亲RDD对应的partition是否存在
® 存在,即可执行当前RDD对应的操作
® 不存在,则重构父亲RDD对应的partition
§ 宽依赖(wide dependency)
□ 执行某个partition时,检查父亲RDD对应的partition是否存在
® 存在,即可执行当前RDD对应的操作
® 不存在,则重构整个父亲RDD
○ RDD存储机制
§ 血缘关系,重算过程在不同节点之间并行,只记录粗粒度的操作
□ RDD提供的持久化(缓存)接口
® persist():对一个RDD标记为持久化
® 接受StorageLevel类型参数,可配置各种级别
® 持久化后的RDD将会被保留在计算节点的中被后面的行动操作重复使用
□ cache()
® 相当于persist(MEMORY_ONLY)
□ unpersist()
® 手动地把持久化的RDD从缓存中移除
○ 检查点机制
§ 前述机制的不足之处
□ Lineage可能非常长
□ RDD存储机制主要面向本地磁盘的存储
§ 检查点机制将RDD写入可靠的外部分布式文件系统,例如HDFS
□ 在实现层面,写检查点的过程是一个独立job,作为后台作业运行
Hadoop简介
MapReduce体系结构
MapReduce的体系结构
![Sample Pic][mapreduce4]
Client:提交作业
![Sample Pic][mapreduce5]
Map
Map任务的数量:
Hadoop 为每个split创建一个Map任务。由split的多少决定的map任务的数目。
注:JobTracker会一直检测Map任务的执行,当监测到一个Map任务完成后,就会立即通知相关的Reduce任务来“领取”数据,然后开始Reduce端的shuffle过程。
Reduce端的shuffle过程
![Sample Pic][mapreduce6]
领取数据
MapReduce 运行失败
机器故障:
JobTracker失败
Combine()是局部函数,在局部的map结束之后对它进行Combine,是在shuffle前做的!
Hadoop的应用现状
![Sample Pic][mapreduce10]