Pair RDD
If we have a regular RDD that we want to turn into a pair RDD. We can do this by running a map() function that returns key/value pairs.
Java doesn’t have a built-in tuple type, so Spark’s Java API has users create tuples using the scala.Tuple2 class. This class is very simple: Java users can construct a new tuple by writing new Tuple2(elem1, elem2) and can then access its elements with the
._1() and ._2()
methods. Java users also need to call special versions of Spark’s functions when creating pair RDDs. For instance, the mapToPair() function should be used in place of the basic map() function
Creating a pair RDD using the first word as the key in Java
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
When creating a pair RDD from an in-memory collection in Scala and Python, we only need to call SparkContext.parallelize() on a collection of pairs. To create a pair RDD in Java from an in-memory collection, we instead use SparkContext.parallelizePairs().
Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements.
//Word count in Java
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }
});
JavaPairRDD<String, Integer> result = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
We can actually implement word count even faster by using the countByValue() function on the first RDD:
input.flatMap(x => x.split(” “)).countByValue()
countByValue() is an action that returns the Map of each unique value with its count
scala> val inputrdd = sc.parallelize{ Seq(10, 4, 3, 3) }
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at :47
scala> inputrdd.countByValue()
res34: scala.collection.Map[Int,Long] = Map(10 -> 1, 3 -> 2, 4 -> 1)
TODO: countByValue() vs. reduceByKey()
cogroup() is used as a building block for the joins. cogroup() can work on three or more RDDs at once.
Optional is part of Google’s Guava library and represents a possibly missing value. We can check isPresent() to see if it’s set, and get() will return the contained instance provided data is present.
//Custom sort order in Java, sorting integers as if strings
class IntegerComparator implements Comparator<Integer> {
public int compare(Integer a, Integer b) {
return String.valueOf(a).compareTo(String.valueOf(b))
}
}
rdd.sortByKey(comp)
Many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record.
here are all the operations that result in a partitioner being set on the output RDD: cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent has a partitioner), and filter() (if parent has a partitioner). All other operations will produce a result with no partitioner.