Action
reduce(func)
- reduce is an action operation in Spark hence it triggers execution of DAG and gets execute on final RDD
- It is a wide operation as it is shuffling data from multiple partitions and reduces to a single value
- It accepts a Commutative and Associative function as an argument
- The parameter function should have two arguments of the same data type
- The return type of the function also must be same as argument types
// reduce numbers 1 to 10 by adding them up
scala> val x = sc.parallelize(1 to 10, 2)
scala> val y = x.reduce((accum,n) => (accum + n))
y: Int = 55
// shorter syntax
scala> val y = x.reduce(_ + _)
y: Int = 55
// same thing for multiplication
scala> val y = x.reduce(_ * _)
y: Int = 3628800
reduce() takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type
scala> val rdd1 = sc.parallelize(List(1, 2, 5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :21
scala> val sum = rdd1.reduce{ (x, y) => x + y}
sum: Int = 8
fold() is similar to reduce except that it takes an ‘Zero value‘(Think of it as a kind of initial value) which will be used in the initial call on each Partition
scala> val rdd1 = sc.parallelize(List(
| ("maths", 80),
| ("science", 90)
| ))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at :21
scala> rdd1.partitions.length
res8: Int = 8
scala> val additionalMarks = ("extra", 4)
additionalMarks: (String, Int) = (extra,4)
scala> val sum = rdd1.fold(additionalMarks){ (acc, marks) =>
| val sum = acc._2 + marks._2
| ("total", sum)
| }
sum: (String, Int) = (total,206)
Note here the Partition length is 8. So in each partition, after the RDD elements are added, the ‘Zero value of 4’ is also added(i.e 8 4 = 32). When the result of each partition is added, once again a value of 4 is added to the result. Therefore we have ((8 4) + 4), totally 36.
Therefore result = 80 + 90 + 36 = 206
Disadvantage :
This disadvantage of both reduce() & fold() is, the return type should be the same as the RDD element type.
aggregate() function can be used to avoid this limitation.
Note : foldByKey() is very similar to fold() except that it operates on a Pair RDD
foreach() is an action. foreach() can be used in situations, where we do not want to return any result, but want to initiate a computation
scala> val testData=Array(1,2,3)
testData: Array[Int] = Array(1, 2, 3)
scala> val inputrdd = sc.parallelize(testData)
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :23
scala>
scala> inputrdd.foreach{ x => {
| println(x)
| }}
3
1
2