Spark之join操作

Spark之join操作.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class join {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("join").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> persons = jsc.textFile("spark/input3/person.txt");
JavaRDD<String> addresses = jsc.textFile("spark/input3/address.txt");


//得到 key: 邮编号,value:学号和名字
JavaPairRDD<String, String> personkv = persons.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {

String[] personsplit = str.split(" |\t"); // notice maybe \t
//System.out.println("length"+personsplit.length);
//处理缺省数据
if (personsplit.length == 3)
{
//System.out.println("i am not null");
String code=personsplit[2];
String value = personsplit[0] +" "+ personsplit[1]; //number + name
return new Tuple2<String, String>(code, value);
}
else {

//System.out.println("null null null ++++++++="+personsplit.length);
return new Tuple2<String,String>(null,null);
}
}
}); // to split the person and get the form <210000,1 Aaron>

//得到key: 邮编号 value:城市
JavaPairRDD<String, String> addresskv = addresses.mapToPair(new PairFunction<String, String,String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String,String> call(String str) throws Exception {
String[] addresssplit = str.split(" |\t");
String code=addresssplit[0];
String value = addresssplit[1]; //city
return new Tuple2<String, String>(code, value);
}
}); //to split the address and get the form <210000,Nanjing>

//进行join操作
JavaPairRDD<String, Tuple2<String, String>> joinres=personkv.join(addresskv);

//遍历输出
joinres.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Tuple2<String, String>> t) throws Exception {
System.out.println(t._2()._1+" "+t._1+ " "+t._2()._2);
}

});

joinres.saveAsTextFile("./spark/output3/");
}

}

输入:

person.txt:
    1 Aaron 210000
    .....
address.txt:
    210000 Nanjing
    .........
0%