Asked  6 Months ago    Answers:  4   Viewed   71 times

Getting strange behavior when calling function outside of a closure:

  • when function is in a object everything is working
  • when function is in a class get :

Task not serializable: java.io.NotSerializableException: testing

The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?

This is a working code example:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

This is the non-working example :

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

 Answers

42

RDDs extend the Serialisable interface, so this is not what's causing your task to fail. Now this doesn't mean that you can serialise an RDD with Spark and avoid NotSerializableException

Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. Basically, RDD's elements are partitioned across the nodes of the cluster, but Spark abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

  1. serialized on the driver node,
  2. shipped to the appropriate nodes in the cluster,
  3. deserialized,
  4. and finally executed on the nodes

You can of course run this locally (as in your example), but all those phases (apart from shipping over network) still occur. [This lets you catch any bugs even before deploying to production]

What happens in your second case is that you are calling a method, defined in class testing from inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the whole testing class, so that the code will still work when executed in another JVM. You have two possibilities:

Either you make class testing serializable, so the whole class can be serialized by Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local
  • ")) } object NOTworking extends App { new Test().doIT } class Test extends java.io.Serializable { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1 }
  • or you make someFunc function instead of a method (functions are objects in Scala), so that Spark will be able to serialize it:

    import org.apache.spark.{SparkContext,SparkConf}
    
    object Spark {
      val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
    
    object NOTworking extends App {
      new Test().doIT
    }
    
    class Test {
      val rddList = Spark.ctx.parallelize(List(1,2,3))
    
      def doIT() =  {
        val after = rddList.map(someFunc)
        after.collect().foreach(println)
      }
    
      val someFunc = (a: Int) => a + 1
    }
    

    Similar, but not the same problem with class serialization can be of interest to you and you can read on it in this Spark Summit 2013 presentation.

    As a side note, you can rewrite rddList.map(someFunc(_)) to rddList.map(someFunc), they are exactly the same. Usually, the second is preferred as it's less verbose and cleaner to read.

    EDIT (2015-03-15): SPARK-5307 introduced SerializationDebugger and Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.

    In OP's case, this is what gets printed to stdout:

    Serialization stack:
        - object not serializable (class: testing, value: testing@2dfe2f00)
        - field (class: testing$$anonfun$1, name: $outer, type: class testing)
        - object (class testing$$anonfun$1, <function1>)
    
    Tuesday, June 1, 2021
     
    Novalirium
    answered 6 Months ago
    49

    Anonymous functions/closures and the use language construct were implemented in version 5.3.0

    @see changelog: http://php.net/manual/en/functions.anonymous.php

    Saturday, May 29, 2021
     
    felipsmartins
    answered 7 Months ago
    23

    Anonymous functions serialize their containing class. When you map {doc => DocumentParameter(doc, numOfTopics)}, the only way it can give that function access to numOfTopics is to serialize the PLSA class. And that class can't actually be serialized, because (as you can see from the stacktrace) it contains the SparkContext which isn't serializable (Bad Things would happen if individual cluster nodes had access to the context and could e.g. create new jobs from within a mapper).

    In general, try to avoid storing the SparkContext in your classes (edit: or at least, make sure it's very clear what kind of classes contain the SparkContext and what kind don't); it's better to pass it as a (possibly implicit) parameter to individual methods that need it. Alternatively, move the function {doc => DocumentParameter(doc, numOfTopics)} into a different class from PLSA, one that really can be serialized.

    (As multiple people have suggested, it's possible to keep the SparkContext in the class but marked as @transient so that it won't be serialized. I don't recommend this approach; it means the class will "magically" change state when serialized (losing the SparkContext), and so you might end up with NPEs when you try to access the SparkContext from inside a serialized job. It's better to maintain a clear distinction between classes that are only used in the "control" code (and might use the SparkContext) and classes that are serialized to run on the cluster (which must not have the SparkContext)).

    Tuesday, September 21, 2021
     
    mahclark
    answered 2 Months ago
    72

    lag returns o.a.s.sql.Column which is not serializable. Same thing applies to WindowSpec. In interactive mode these object may be included as a part of the closure for map:

    scala> import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.expressions.Window
    
    scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
    df: org.apache.spark.sql.DataFrame = [x: string, y: int]
    
    scala> val w = Window.partitionBy("x").orderBy("y")
    w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097
    
    scala> val lag_y = lag(col("y"), 1).over(w)
    lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
    
    scala> def f(x: Any) = x.toString
    f: (x: Any)String
    
    scala> df.select(lag_y).map(f _).first
    org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    ...
    Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
    Serialization stack:
        - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)
    

    A simple solution is to mark both as transient:

    scala> @transient val w = Window.partitionBy("x").orderBy("y")
    w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470
    
    scala> @transient val lag_y = lag(col("y"), 1).over(w)
    lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
    
    scala> df.select(lag_y).map(f _).first
    res1: String = [null]     
    
    Friday, October 8, 2021
     
    Domiik
    answered 2 Months ago
    Only authorized users can answer the question. Please sign in first, or register a free account.
    Not the answer you're looking for? Browse other questions tagged :  
    Share