Uploaded image for project: 'CDAP'
  1. CDAP
  2. CDAP-12941

Nested anonymous classes cannot be used in Dynamic Spark

    Details

    • Release Notes:
      Fixed dynamic spark plugin to use appropriate context classloader for loading dynamic spark code
    • Rank:
      1|i009un:

      Description

      It looks like nested anonymous functions are not properly shipped when evaluating dynamic spark code. To reproduce:

      1. Install 'Dynamic Spark Plugin' v2.0.3 from Cask Market

      2. Create a pipeline that contains just a single ScalaSparkProgram stage

      3. Configure the stage with the following code:

      import co.cask.cdap.api.spark._
      import org.apache.spark._
      import org.apache.spark.rdd.RDD
      import org.slf4j._
      
      class SparkProgram extends SparkMain {
        import SparkProgram._
      
        override def run(implicit sec: SparkExecutionContext): Unit = {
          LOG.info("Spark Program Started")
      
          val sc = new SparkContext
      
          val points: RDD[(String, Array[Int])] = sc.parallelize(Seq(("a", Array(1, 2)), ("a", Array(3, 4))))
          LOG.info("points = {}", points.collect)
          
          
          val squared: RDD[(String, Array[Int])] = points.mapValues(t => Array(t.apply(0) * t.apply(0), t.apply(1) * t.apply(1)))
          LOG.info("squared = {}", squared.collect)
          
          val squaredNested: RDD[(String, Array[Int])] = points.mapValues(t => t.map(x => x * x))
          LOG.info("squaredNexted = {}", squaredNested.collect)
      
          LOG.info("Spark Program Completed")
        }
      }
      
      object SparkProgram {
        val LOG = LoggerFactory.getLogger(getClass())
      }
      

      4. Deploy and run the pipeline. There will be an exception like:

      2017-11-30 13:24:07,285 - ERROR [SparkRunnerphase-1:c.c.c.i.a.r.ProgramControllerServiceAdapter@98] - Spark Program 'phase-1' failed.
      java.util.concurrent.ExecutionException: java.lang.ClassNotFoundException: Class not found in all delegated ClassLoaders: SparkProgram$$anonfun$2$$anonfun$apply$1
      	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:294) ~[com.google.guava.guava-13.0.1.jar:na]
      	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:281) ~[com.google.guava.guava-13.0.1.jar:na]
      	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[com.google.guava.guava-13.0.1.jar:na]
      	at co.cask.cdap.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:351) ~[co.cask.cdap.cdap-spark-core-4.3.1.jar:na]
      	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[com.google.guava.guava-13.0.1.jar:na]
      	at co.cask.cdap.app.runtime.spark.SparkRuntimeService$5$1.run(SparkRuntimeService.java:409) [co.cask.cdap.cdap-spark-core-4.3.1.jar:na]
      	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
      Caused by: java.lang.ClassNotFoundException: Class not found in all delegated ClassLoaders: SparkProgram$$anonfun$2$$anonfun$apply$1
      	at co.cask.cdap.common.lang.CombineClassLoader.findClass(CombineClassLoader.java:96) ~[na:na]
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_77]
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_77]
      	at co.cask.cdap.common.lang.WeakReferenceDelegatorClassLoader.findClass(WeakReferenceDelegatorClassLoader.java:58) ~[na:na]
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_77]
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_77]
      	at java.lang.Class.forName0(Native Method) ~[na:1.8.0_77]
      	at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_77]
      	at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:435) ~[na:na]
      	at org.apache.xbean.asm5.ClassReader.a(Unknown Source) ~[na:na]
      	at org.apache.xbean.asm5.ClassReader.b(Unknown Source) ~[na:na]
      	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source) ~[na:na]
      	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source) ~[na:na]
      	at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84) ~[na:na]
      	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187) ~[na:na]
      	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) ~[na:na]
      	at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) ~[na:na]
      	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:753) ~[na:na]
      	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:752) ~[na:na]
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) ~[na:na]
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) ~[na:na]
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) ~[na:na]
      	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:752) ~[na:na]
      	at SparkProgram.run(<source-string-1>:21) ~[na:na]
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_77]
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_77]
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_77]
      	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_77]
      	at co.cask.hydrator.plugin.spark.dynamic.ScalaSparkProgram$1.call(ScalaSparkProgram.java:154) ~[na:na]
      	at co.cask.hydrator.plugin.spark.dynamic.ScalaSparkProgram$1.call(ScalaSparkProgram.java:147) ~[na:na]
      	at co.cask.hydrator.plugin.spark.dynamic.ScalaSparkProgram.run(ScalaSparkProgram.java:95) ~[na:na]
      	at co.cask.cdap.etl.spark.plugin.WrappedJavaSparkMain$1.call(WrappedJavaSparkMain.java:43) ~[na:na]
      	at co.cask.cdap.etl.spark.plugin.WrappedJavaSparkMain$1.call(WrappedJavaSparkMain.java:40) ~[na:na]
      	at co.cask.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[na:na]
      	at co.cask.cdap.etl.common.plugin.StageLoggingCaller.call(StageLoggingCaller.java:40) ~[na:na]
      	at co.cask.cdap.etl.spark.plugin.WrappedJavaSparkMain.run(WrappedJavaSparkMain.java:40) ~[na:na]
      	at co.cask.cdap.datapipeline.JavaSparkMainWrapper.run(JavaSparkMainWrapper.java:60) ~[na:na]
      	at co.cask.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:82) ~[na:na]
      	at co.cask.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) ~[na:na]
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_77]
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_77]
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_77]
      	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_77]
      	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) ~[na:na]
      	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) ~[na:na]
      	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) ~[na:na]
      	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) ~[na:na]
      	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[na:na]
      	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.submit(AbstractSparkSubmitter.java:171) ~[na:na]
      	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.access$000(AbstractSparkSubmitter.java:53) ~[na:na]
      	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter$5.run(AbstractSparkSubmitter.java:110) ~[na:na]
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_77]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_77]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_77]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_77]
      	... 1 common frames omitted
      

      The program calculates the square of every element in an array in 2 ways. The first does not a closure within a closure and will succeed, which can be verified by looking for a message in the program log.

          val squared: RDD[(String, Array[Int])] = points.mapValues(t => Array(t.apply(0) * t.apply(0), t.apply(1) * t.apply(1)))
          LOG.info("squared = {}", squared.collect)
      

      The exception will occur when calculating 'squaredNested', which uses a closure (map) within another closure (mapValues):

          val squaredNested: RDD[(String, Array[Int])] = points.mapValues(t => t.map(x => x * x))
          LOG.info("squaredNexted = {}", squaredNested.collect)
      

        Attachments

          Activity

            People

            • Assignee:
              terence Terence Yim
              Reporter:
              ashau Albert Shau
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: