Uploaded image for project: 'CDAP'
  1. CDAP
  2. CDAP-13787

MessagingSubscriberService transactions times out if the processing of message is not fast

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 5.0.0
    • Fix Version/s: 5.0.0
    • Component/s: Messaging, Metadata
    • Labels:
      None
    • Rank:
      1|i00eof:

      Description

      MessagingSubscriberService starts its own transaction and iterates over a fixed size of messages to process each message. (For metadata the default value of this fetch size is 100).

      Although if the underlying call which processes message is not fast then this causes transaction timeouts leading to a RuntimeException which is caused by trying to call commit on an invalid transaction. This leads to messaging service retrying the processing again and again and after a certain number of configured retries the service dies leading to the failure of dataset.op.executor and causing it to restart continuously. This leads to severe performance degradation.

      On a 12 node cluster with min mem of yarn container of 2gb and a map reduce job emitting 1000 tags (simple tags 1 to 1000 numbers) the dataset op executor kept restarting every 3-5 seconds. This is because the MessagingSubscriberService tries to process 100 records (current configuration) in a single transaction and the underlying process is involved multiple hbase writes which had i/o and network latency and is slow.

      Reducing the configured fetch size of messages from tms to 25 for the subscriber alleviates the problem in the above scenario but a manually reduced fetch size is not a good solution to this problem as even with lower batch size if the network or hbase is slow we will end up with same problem.

      Another possible solution is to make the batch size of message being processed from the TMS in MetadataSubscriberService (which currently processed lineage, metadata, workflow token, usage and profiles) `1` but in in our current implementation of AbstractMessagingSubscriberService this will mean that in every iteration of of the service run we process one record which is not very scalable.

      One other possibility is to decouple the fetching of messages from TMS so processing them so that they can happen independently if one has high throughput another slow we need some kind of queueing mechanism to queue up the messages to be processed by the processor and if it is a in-memory queue then restarts or yarn application can cause the messages to be lost on the processor end.

      Relavant stack trace

      2018-07-18 06:28:50,042 - WARN  [MetadataSubscriberService:c.c.c.m.s.AbstractMessagingSubscriberService@182] - Failed to get and process notifications. Will retry in next run
      java.lang.RuntimeException: org.apache.tephra.TransactionNotInProgressException: canCommit() is called for transaction 1531895296796000000 that is not in progress (it is known to be invalid)
      	at co.cask.cdap.api.Transactionals.propagate(Transactionals.java:175) ~[na:na]
      	at co.cask.cdap.api.Transactionals.execute(Transactionals.java:108) ~[na:na]
      	at co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService.fetchAndProcessMessages(AbstractMessagingSubscriberService.java:217) ~[na:na]
      	at co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService.runTask(AbstractMessagingSubscriberService.java:163) ~[na:na]
      	at co.cask.cdap.common.service.AbstractRetryableScheduledService.runOneIteration(AbstractRetryableScheduledService.java:144) ~[na:na]
      	at com.google.common.util.concurrent.AbstractScheduledService$1$1.run(AbstractScheduledService.java:170) [com.google.guava.guava-13.0.1.jar:na]
      	at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:355) [com.google.guava.guava-13.0.1.jar:na]
      	at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:321) [com.google.guava.guava-13.0.1.jar:na]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_161]
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_161]
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_161]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
      	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
      Caused by: org.apache.tephra.TransactionNotInProgressException: canCommit() is called for transaction 1531895296796000000 that is not in progress (it is known to be invalid)
      	at org.apache.tephra.distributed.TransactionServiceThriftClient.canCommit(TransactionServiceThriftClient.java:195) ~[org.apache.tephra.tephra-core-0.13.0-incubating.jar:0.13.0-incubating]
      	at org.apache.tephra.distributed.TransactionServiceClient$4.execute(TransactionServiceClient.java:363) ~[org.apache.tephra.tephra-core-0.13.0-incubating.jar:0.13.0-incubating]
      	at org.apache.tephra.distributed.TransactionServiceClient$4.execute(TransactionServiceClient.java:359) ~[org.apache.tephra.tephra-core-0.13.0-incubating.jar:0.13.0-incubating]
      	at org.apache.tephra.distributed.TransactionServiceClient.execute(TransactionServiceClient.java:246) ~[org.apache.tephra.tephra-core-0.13.0-incubating.jar:0.13.0-incubating]
      	at org.apache.tephra.distributed.TransactionServiceClient.execute(TransactionServiceClient.java:215) ~[org.apache.tephra.tephra-core-0.13.0-incubating.jar:0.13.0-incubating]
      	at org.apache.tephra.distributed.TransactionServiceClient.canCommitOrThrow(TransactionServiceClient.java:358) ~[org.apache.tephra.tephra-core-0.13.0-incubating.jar:0.13.0-incubating]
      	at co.cask.cdap.data2.transaction.TransactionSystemClientAdapter.canCommitOrThrow(TransactionSystemClientAdapter.java:85) ~[na:na]
      	at co.cask.cdap.data2.transaction.AbstractTransactionContext.checkForConflicts(AbstractTransactionContext.java:238) ~[na:na]
      	at co.cask.cdap.data2.transaction.AbstractTransactionContext.finish(AbstractTransactionContext.java:114) ~[na:na]
      	at co.cask.cdap.data2.transaction.Transactions$CacheBasedTransactional.finishExecute(Transactions.java:230) ~[na:na]
      	at co.cask.cdap.data2.transaction.Transactions$CacheBasedTransactional.execute(Transactions.java:211) ~[na:na]
      	at co.cask.cdap.data2.transaction.Transactions$2.executeInternal(Transactions.java:261) ~[na:na]
      	at co.cask.cdap.data2.transaction.Transactions$2.execute(Transactions.java:248) ~[na:na]
      	at co.cask.cdap.api.Transactionals.execute(Transactionals.java:106) ~[na:na]
      	... 12 common frames omitted
      

      Service terminated stack trace:

      2018-07-18 06:34:35,256 - INFO  [MetadataService STOPPING:c.c.h.NettyHttpService@242] - Stopping HTTP Service metadata.service
      2018-07-18 06:34:35,272 - INFO  [MetadataService STOPPING:c.c.c.m.MetadataService@96] - Metadata HTTP service stopped
      2018-07-18 06:34:35,275 - INFO  [MetadataService STOPPING:c.c.c.c.t.AbstractMasterTwillRunnable@156] - Service co.cask.cdap.metadata.MetadataService terminated
      2018-07-18 06:34:35,276 - INFO  [DatasetOpExecutorService STOPPING:c.c.c.d.d.d.s.e.DatasetOpExecutorService@91] - Stopping DatasetOpExecutorService...
      2018-07-18 06:34:35,281 - INFO  [DatasetOpExecutorService STOPPING:c.c.h.NettyHttpService@242] - Stopping HTTP Service dataset.executor
      2018-07-18 06:34:35,292 - INFO  [DatasetOpExecutorService STOPPING:c.c.c.c.t.AbstractMasterTwillRunnable@156] - Service co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorService terminated
      2018-07-18 06:34:35,294 - INFO  [MessagingMetricsCollectionService:c.c.c.c.t.AbstractMasterTwillRunnable@156] - Service co.cask.cdap.metrics.collect.MessagingMetricsCollectionService terminated
      2018-07-18 06:34:35,297 - INFO  [DefaultBrokerService STOPPING:c.c.c.c.t.AbstractMasterTwillRunnable@156] - Service co.cask.cdap.common.guice.KafkaClientModule$DefaultBrokerService terminated
      2018-07-18 06:34:35,302 - INFO  [ZKKafkaClientService STOPPING:o.a.t.i.k.c.ZKKafkaClientService@106] - Stopping KafkaClientService
      2018-07-18 06:34:35,304 - INFO  [ZKKafkaClientService STOPPING:o.a.t.i.k.c.ZKKafkaClientService@114] - KafkaClientService stopped
      2018-07-18 06:34:35,306 - INFO  [DefaultKafkaClientService STOPPING:c.c.c.c.t.AbstractMasterTwillRunnable@156] - Service co.cask.cdap.common.guice.KafkaClientModule$DefaultKafkaClientService terminated
      2018-07-18 06:34:35,309 - INFO  [zk-client-EventThread:c.c.c.c.t.AbstractMasterTwillRunnable@156] - Service org.apache.twill.zookeeper.ZKClientServices$1 terminated
      2018-07-18 06:34:35,309 - INFO  [TwillContainerService:c.c.c.c.t.AbstractMasterTwillRunnable@149] - Runnable stopped dataset.executor
      2018-07-18 06:34:35,311 - INFO  [TwillContainerService:o.a.t.i.AbstractTwillService@232] - Remove live xxxx:2181/cdap/twill/master.services/f89076e5-0830-4694-b901-08c8bdf91b68/runnables/dataset.executor/instances/e499d445-b274-4335-bf6e-b079c4dd9f5a-0
      2018-07-18 06:34:35,315 - INFO  [TwillContainerService:o.a.t.i.AbstractTwillService@204] - Service TwillContainerService with runId e499d445-b274-4335-bf6e-b079c4dd9f5a-0 shutdown completed
      2018-07-18 06:34:35,318 - ERROR [main:o.a.t.i.ServiceMain@106] - Exception thrown from service TwillContainerService [FAILED].
      

      NodeManager log for dataset op executor container

      2018-07-18 06:17:26,742 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Start request for container_1531847866267_0018_01_000002 by user cdap
      2018-07-18 06:17:26,742 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Creating a new application reference for app application_1531847866267_0018
      2018-07-18 06:17:26,742 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=cdap	IP=xxxx	OPERATION=Start Container Request	TARGET=ContainerManageImpl	RESULT=SUCCESS	APPID=application_1531847866267_0018	CONTAINERID=container_1531847866267_0018_01_000002
      2018-07-18 06:17:26,742 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl: Application application_1531847866267_0018 transitioned from NEW to INITING
      2018-07-18 06:17:26,742 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl: Adding container_1531847866267_0018_01_000002 to application application_1531847866267_0018
      2018-07-18 06:17:26,749 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: rollingMonitorInterval is set as -1. The log rolling mornitoring interval is disabled. The logs will be aggregated after this application is finished.
      2018-07-18 06:17:26,752 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl: Application application_1531847866267_0018 transitioned from INITING to RUNNING
      2018-07-18 06:17:26,753 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: Container container_1531847866267_0018_01_000002 transitioned from NEW to LOCALIZING
      container_1531847866267_0018_01_000002: 633.8 MB of 2 GB physical memory used; 3.3 GB of 10.2 GB virtual memory used
      2018-07-18 06:34:36,620 WARN org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor: Exit code from container container_1531847866267_0018_01_000002 is : 1
      2018-07-18 06:34:36,620 WARN org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor: Exception from container-launch with container ID: container_1531847866267_0018_01_000002 and exit code: 1
      ExitCodeException exitCode=1: 
      	at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
      	at org.apache.hadoop.util.Shell.run(Shell.java:487)
      	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
      	at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371)
      	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
      	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exception from container-launch.
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Container id: container_1531847866267_0018_01_000002
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exit code: 1
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Stack trace: ExitCodeException exitCode=1: 
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at org.apache.hadoop.util.Shell.run(Shell.java:487)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
      2018-07-18 06:34:36,620 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 	at java.lang.Thread.run(Thread.java:748)
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: 
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Shell output: main : command provided 1
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: main : run as user is cdap
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: main : requested yarn user is cdap
      2018-07-18 06:34:36,621 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Container exited with a non-zero exit code 1
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: Container container_1531847866267_0018_01_000002 transitioned from RUNNING to EXITED_WITH_FAILURE
      2018-07-18 06:34:36,621 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1531847866267_0018_01_000002
      2018-07-18 06:34:36,637 WARN org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=cdap	OPERATION=Container Finished - Failed	TARGET=ContainerImpl	RESULT=FAILURE	DESCRIPTION=Container failed with state: EXITED_WITH_FAILURE	APPID=application_1531847866267_0018	CONTAINERID=container_1531847866267_0018_01_000002
      2018-07-18 06:34:36,637 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: Container container_1531847866267_0018_01_000002 transitioned from EXITED_WITH_FAILURE to DONE
      2018-07-18 06:34:36,637 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl: Removing container_1531847866267_0018_01_000002 from application application_1531847866267_0018
      2018-07-18 06:34:36,637 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Considering container container_1531847866267_0018_01_000002 for log-aggregation
      2018-07-18 06:34:36,637 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event CONTAINER_STOP for appId application_1531847866267_0018
      2018-07-18 06:34:38,014 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Stopping resource-monitoring for container_1531847866267_0018_01_000002
      2018-07-18 06:34:38,650 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Removed completed containers from NM context: [container_1531847866267_0018_01_000002]
      

      Relavant code links:

      1. Transaction which gets timed out in AbstractMessagingSubscriberService: https://github.com/caskdata/cdap/blob/release/5.0/cdap-tms/src/main/java/co/cask/cdap/messaging/subscriber/AbstractMessagingSubscriberService.java#L217

      2. Service shutdown: https://github.com/caskdata/cdap/blob/release/5.0/cdap-common/src/main/java/co/cask/cdap/common/service/AbstractRetryableScheduledService.java#L137
      (After max retries)

      3. Fetch size: https://github.com/caskdata/cdap/blob/release/5.0/cdap-common/src/main/resources/cdap-default.xml#L2027

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                rsinha Rohit Sinha
                Reporter:
                rsinha Rohit Sinha
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: