[CDAP-10974] Capabilities to set up macros from plugin types Created: 03/Apr/17  Updated: 23/Aug/17  Resolved: 23/Aug/17

Status: Resolved
Project: CDAP
Component/s: Pipelines
Affects Version/s: None
Fix Version/s: 4.3.0

Type: Improvement Priority: Major
Reporter: Sreevatsan Raman Assignee: Albert Shau
Resolution: Fixed Votes: 0
Labels: cdap_user

Release Notes: BatchSource, BatchSink, BatchAggregator, BatchJoiner, and Transform plugins now have a way to get SettableArguments when preparing a run, which allows them to set arguments for the rest of the pipeline
Rank: 1|hzzz0v:


Currently the macros can be set only in action plugin, there are other scenarios where the macros should be determined as a part of rest of the plugin. Example: input source pipeline determining schema for output sink, one of the transforms determining the schema for output sink. In these cases it will be useful to determine the macro which can be set during the initialize phase of the pipeline.

Comment by Albert Shau [ 03/Apr/17 ]

One possible way to do this is to add a method to TransformContext like the one we have in ActionContext:

   * Return the arguments which can be updated.
  SettableArguments getArguments();

The problem with doing this at initialize() time for a plugin is that initialize is assumed to be run once when a plugin is instantiated and before other method calls are made. In other words, it is expected that somebody can do:

  private TransformContext context;

  public void initialize(TransformContext context) throws Exception {
    this.context = context;

  public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) {
    context.getArguments().set("key", "val");

But the semantics of this become very confusing, especially when it comes to when writes from another stage will be visible, and what happens if multiple stages write to the same key. In addition, it straight up doesn't work in Spark. This would be similar to the LookupProvider interface, which works in MapReduce but not in Spark, which confuses everybody.

Another option is to add a prepareRun() method to all plugin types (currently only available to source, sink, aggregator, sparksink), and add a getArguments() method to the context passed into prepareRun. The semantics of prepareRun() have always been that it is run once at the start of each batch. The most difficult part would be implementing this in SparkStreaming with checkpoints, since there is no centralized place where we can call prepareRun() in a controlled fashion.

Comment by Terence Yim [ 04/Apr/17 ]

It should only be setting via prepareRun (basically in the initialize() call of each of the node in the Workflow, which happens in the workflow driver).

Comment by Ali Anwar [ 18/Aug/17 ]

Implemented a prepareRun for Transform plugin type, which exposes TMS APIs: https://github.com/caskdata/cdap/pull/9240
I diid not have time to implement it for other plugin types, nor did I add access to a SettableArguments.

Comment by Albert Shau [ 23/Aug/17 ]

https://github.com/caskdata/cdap/pull/9452 adds SettableArguments for transforms. It is now available to BatchSource, BatchSink, BatchJoiner, BatchAggregator, and Transform

Generated at Tue Feb 19 16:35:43 UTC 2019 using Jira 7.13.0#713000-sha1:fbf406879436de2f3fb1cfa09c7fa556fb79615a.