Uploaded image for project: 'CDAP'
  1. CDAP
  2. CDAP-16171

Spark streaming pipeline with pubsub source fails with rateLimitExceeded error when checkpointing is enabled in cloud environment

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: 6.3.0
    • Component/s: None
    • Rank:
      1|i00t07:

      Description

      Spark streaming pipeline with pubsub source fails with rateLimitExceeded error when checkpointing is enabled in cloud environment.

      Steps to reproduce:

      1.) Deploy attached pipeline

      2.) Ingest ~800 events of ~100kb to pubsub and read those events through pubsub source

      2019-12-10 04:52:22,002 - WARN  [pool-34-thread-1:o.a.s.s.CheckpointWriter@87] - Error in attempt 1 of writing checkpoint to 'gs://df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/checkpoint-1575953535000'
      java.io.IOException: Upload failed
      	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:309) ~[util-2.0.0.jar:na]
      	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:215) ~[util-2.0.0.jar:na]
      	at java.nio.channels.Channels$1.close(Channels.java:178) ~[na:1.8.0_232]
      	at java.io.FilterOutputStream.close(FilterOutputStream.java:159) ~[na:1.8.0_232]
      	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:128) ~[gcs-connector-hadoop2-2.0.0.jar:na]
      	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[hadoop-common-2.9.2.jar:na]
      	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[hadoop-common-2.9.2.jar:na]
      	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler$$anonfun$run$2.apply$mcV$sp(Checkpoint.scala:244) ~[spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1390) ~[spark-core_2.11-2.3.4.jar:na]
      	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:243) ~[spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
      	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
      Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 410 Gone
      {
        "code" : 429,
        "errors" : [ {
          "domain" : "usageLimits",
          "message" : "The total number of changes to the object df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/temp exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
          "reason" : "rateLimitExceeded"
        } ],
        "message" : "The total number of changes to the object df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/temp exceeds the rate limit. Please reduce the rate of create, update, and delete requests."
      }
      	at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:150) ~[na:na]
      	at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) ~[na:na]
      	at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) ~[na:na]
      	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:562) ~[na:na]
      	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) ~[na:na]
      	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) ~[na:na]
      	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) ~[util-2.0.0.jar:na]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_232]
      	... 3 common frames omitted
      2019-12-10 04:52:22,821 - INFO  [gcs-async-channel-pool-0:c.g.c.h.u.RetryHttpInitializer@76] - Encountered status code 410 when accessing URL 'https://www.googleapis.com/upload/storage/v1/b/df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/o?ifGenerationMatch=0&name=checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/temp&uploadType=resumable&upload_id=AEnB2UqOSnd7_a7fKEez99BXxxXiPXVhpdrGQhfI37wel_snuFdN-U3aEUp8HmWKZp1s2AceZyYlZGFzR2RMlJu6XqU0rRXXNROgF5ccGRmr_32OfC_SM6M'. Delegating to response handler for possible retry.
      2019-12-10 04:52:22,832 - WARN  [pool-34-thread-1:o.a.s.s.CheckpointWriter@87] - Error in attempt 2 of writing checkpoint to 'gs://df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/checkpoint-1575953535000'
      java.io.IOException: Upload failed
      	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:309) ~[util-2.0.0.jar:na]
      	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:215) ~[util-2.0.0.jar:na]
      	at java.nio.channels.Channels$1.close(Channels.java:178) ~[na:1.8.0_232]
      	at java.io.FilterOutputStream.close(FilterOutputStream.java:159) ~[na:1.8.0_232]
      	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:128) ~[gcs-connector-hadoop2-2.0.0.jar:na]
      	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[hadoop-common-2.9.2.jar:na]
      	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[hadoop-common-2.9.2.jar:na]
      	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler$$anonfun$run$2.apply$mcV$sp(Checkpoint.scala:244) ~[spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1390) ~[spark-core_2.11-2.3.4.jar:na]
      	at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:243) ~[spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
      	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
      Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 410 Gone
      {
        "code" : 429,
        "errors" : [ {
          "domain" : "usageLimits",
          "message" : "The total number of changes to the object df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/temp exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
          "reason" : "rateLimitExceeded"
        } ],
        "message" : "The total number of changes to the object df-10983341680461967420-n4sproqqvmi6vcp26t26qdihwa/checkpoints/pubsub_single_source_two_executors/190deb8c-7401-43c9-bfb5-0a067ae7e2c8/temp exceeds the rate limit. Please reduce the rate of create, update, and delete requests."
      }
      	at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:150) ~[na:na]
      	at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) ~[na:na]
      	at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) ~[na:na]
      	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:562) ~[na:na]
      	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) ~[na:na]
      	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) ~[na:na]
      	at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) ~[util-2.0.0.jar:na]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_232]
      	... 3 common frames omitted
      2019-12-10 04:52:25,002 - WARN  [JobGenerator:o.a.s.s.s.ReceivedBlockTracker@87] - Exception thrown while writing record: BatchAllocationEvent(1575953545000 ms,AllocatedBlocks(Map(0 -> ArrayBuffer(ReceivedBlockInfo(0,Some(99),None,BlockManagerBasedStoreResult(input-0-1575953323921,Some(99))))))) to the WriteAheadLog.
      java.lang.IllegalStateException: close() was called on BatchedWriteAheadLog before write request with time 1575953545000 could be fulfilled.
      	at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:87) ~[na:2.3.4]
      	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:122) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:209) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.8.jar:na]
      	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) [spark-streaming_2.11-2.3.4.jar:2.3.4]
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.11-2.3.4.jar:na] 

        Attachments

          Activity

            People

            • Assignee:
              mikkin Mikkin Patel
              Reporter:
              vinisha Vinisha Shah
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: