Uploaded image for project: 'CDAP'
  1. CDAP
  2. CDAP-15910

generalized joiner : 1-n relations, temporal or geo join, etc...


    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 5.1.2
    • Fix Version/s: Parking Lot
    • Component/s: API, App Fabric, CDAP
    • Labels:
    • Rank:


      In my understanding, the joiner API seems to have a very narrow scope:  it may only be used safely for joining two or more inputs via a common primary key, i.e. all inputs must have a 1 to 1 (inner join) or 1 to 0,1 relationship (outer join). This analysis is based on actuals test on 1 to n relationship derived from testInnerJoinWithMultiOutput in DataPipelineTest: when adding more item records and more transaction record with same customer id, the result is not the one expected.

      At first sight (I did not try it i must admit), this could be fixed to support 1 to n relationship by slightly changing code, main change would be to have the merge function in Joiner return a collection of OUT (in the batchjoiner case, one output for each valid couple of triple of results that match the same key). 

      But then, there are two problems:

      1. Compatibility issues as the API would break (could be fixed by preserving the current merge signature, and using a default implementation for the new "mergeMulti" mehod that delegates to the former)
      2. This API would not perfectly fit the need to go outside equijoins, where relationship between joined inputs can hardly been expressed in term of sharing of some key: for example, temporal join (join by temporal proximity) or geospatial join (join by geographic proximity).... Those are precisely the use cases that I have to deal with in a big data context (millions of temporal or geospatial data to join in batch and streaming context)

      That is why I have come to the following proposition of evolution, as  a base of discussion backed by an actual implementation.

      Proposed epics/stories are the following:

      • As a plugin developper, I want to be able to develop a generalized join plugin with a join method that accepts a map of data collection, one data collection per input stage, and that returns a data collection. Spark is the primary targeted engine, so "data collection" should translate to RDD, as in the SparkCompute plugin, and so the "generalized join plugin" interface should be named SparkJoiner
      • As a pipeline configurator, I want to be able to use a Joiner plugin that can inner or outer join by an equijoin an arbitrary number of inputs that have one to one or one to many relationships. The plugin GUI could be the same as the current BatchJoiner plugin GUI, as the former subsumes the later
      • As a pipeline configurator, I want to be able to use a temporal joiner plugin that may left outer join two inputs based on a time field (a Unix timestamp) with a "nearest up to some tolerance (e.g. 10 minutes, 1 hour...)" semantic, and optional exact match fields

      In the proposed attachment, you will find the proposed implementation, as a patch based on 5.1.2 base code.
      It includes:

      • A new SparkJoiner plugin type and CDAP code to support it in both batch and streaming context
      • First concrete implementation and proof of concept of a temporal joiner plugin is a new hydrator plugin named TemporalJoiner that is A SparkJoiner based on the flint library for timeseries from com.twosigmas

      The later plugin has been successfully tested with a join between a 10,000,000 dataset as the left side and a 3,400,000 dataset as the right side of the join and proved to be very efficient both in time and memory consmption

      you will also find a simple pipeline that illustrates the use of the plugin  (image and json file)





            • Assignee:
              bhooshan Bhooshan Mogal
              gholler Guillaume HOLLER
            • Votes:
              11 Vote for this issue
              11 Start watching this issue


              • Created: