Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 4.1.0
    • Component/s: Pipelines
    • Labels:
      None
    • Release Notes:
      Added a new ErrorTransform plugin-type that can be placed after a pipeline stage to consume errors emitted by that stage.
    • Rank:
      1|hzztw7:

      Description

      Error datasets aren't very usable in Hydrator. They were added before many of the other Hydrator features were added, and plain don't work in Spark pipelines and in certain MapReduce pipelines (depending on the calculated plan). We need to redesign how they are handled.

      Based on some initial discussions, an initial thought is to treat errors more similarly to normal records. Error records get passed along with normal output, and users can configure a stage to receive all records, filter errors, or only receive errors. Users could then make error handling a feature of the pipeline instead of writing to some file or table that nobody looks at anyway.

      Currently, plugins emit normal output and errors through the Emitter interface:

      public interface Emitter<T> {
      
        /**
         * Emit an object.
         * @param value the object to emit
         */
        void emit(T value);
      
        /**
         * Emit an Error object.
         *
         * @param invalidEntry {@link InvalidEntry InvalidEntry&lt;T&gt;} representing the error.
         */
        void emitError(InvalidEntry<T> invalidEntry);
      }
      

      This could stay the same. One improvement we may want to consider is to catch exceptions thrown by the plugin and automatically call emitError() on the record that caused the exception. Plugins could be changed so that all the methods that get a record as input get a Record object that contains the actual object to operate on, as well as whether it was an error or not. For example:

      public interface Record<T> {
        T get();
      
        boolean isError();
      
        String getErrorMessage();
      
        Integer getErrorCode();  
      }
      
      @Beta
      public interface Transformation<IN, OUT> {
        // existing method, deprecate
        void transform(IN input, Emitter<OUT> emitter) throws Exception;
      
        // new method
        void transform(Record<IN> input, Emitter<OUT> emitter) throws Exception;
      }
      

      The abstract classes plugins extend can implement this new method by calling the existing method, which means existing plugins don't need to change.

        Attachments

          Activity

            People

            • Assignee:
              ashau Albert Shau
              Reporter:
              ashau Albert Shau
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: