Action

reduce(func)

  • reduce is an action operation in Spark hence it triggers execution of DAG and gets execute on final RDD
  • It is a wide operation as it is shuffling data from multiple partitions and reduces to a single value
  • It accepts a Commutative and Associative function as an argument
    • The parameter function should have two arguments of the same data type
    • The return type of the function also must be same as argument types

Apache Spark reduce example

// reduce numbers 1 to 10 by adding them up
scala> val x = sc.parallelize(1 to 10, 2)
scala> val y = x.reduce((accum,n) => (accum + n)) 
y: Int = 55

// shorter syntax
scala> val y = x.reduce(_ + _) 
y: Int = 55

// same thing for multiplication
scala> val y = x.reduce(_ * _) 
y: Int = 3628800

reduce() takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type

scala> val rdd1 = sc.parallelize(List(1, 2, 5)) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :21

scala> val sum = rdd1.reduce{ (x, y) => x + y}
sum: Int = 8

fold() is similar to reduce except that it takes an ‘Zero value‘(Think of it as a kind of initial value) which will be used in the initial call on each Partition

scala> val rdd1 = sc.parallelize(List(
     | ("maths", 80),
     | ("science", 90)
     | ))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at :21

scala> rdd1.partitions.length
res8: Int = 8

scala> val additionalMarks = ("extra", 4)
additionalMarks: (String, Int) = (extra,4)

scala> val sum = rdd1.fold(additionalMarks){ (acc, marks) =>
     |    val sum = acc._2 + marks._2
     |    ("total", sum)
     | }
sum: (String, Int) = (total,206)

Note here the Partition length is 8. So in each partition, after the RDD elements are added, the ‘Zero value of 4’ is also added(i.e 8 4 = 32). When the result of each partition is added, once again a value of 4 is added to the result. Therefore we have ((8 4) + 4), totally 36.

Therefore result = 80 + 90 + 36 = 206

Disadvantage :

This disadvantage of both reduce() & fold() is, the return type should be the same as the RDD element type.

aggregate() function can be used to avoid this limitation.

Note : foldByKey() is very similar to fold() except that it operates on a Pair RDD

foreach() is an action. foreach() can be used in situations, where we do not want to return any result, but want to initiate a computation

scala> val testData=Array(1,2,3)
testData: Array[Int] = Array(1, 2, 3)

scala> val inputrdd = sc.parallelize(testData)
inputrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :23

scala>

scala> inputrdd.foreach{ x => {
     |                         println(x)
     |                       }}
3
1
2

results matching ""

    No results matching ""