diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java new file mode 100644 index 0000000000..1374c10397 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.http.annotation.ThreadSafe; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@ThreadSafe() +@EventDriven() +@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"}) +@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various " + + "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a " + + "FlowFile if using timer or cron based scheduling. It can force responses needed to exercise or test " + + "various failure modes that can occur when a processor runs.") +public class DebugFlow extends AbstractProcessor { + + private final AtomicReference> relationships = new AtomicReference<>(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles processed successfully.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to process.") + .build(); + + private final AtomicReference> propertyDescriptors = new AtomicReference<>(); + + static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder() + .name("FlowFile Success Iterations") + .description("Number of FlowFiles to forward to success relationship.") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder() + .name("FlowFile Failure Iterations") + .description("Number of FlowFiles to forward to failure relationship.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder() + .name("FlowFile Rollback Iterations") + .description("Number of FlowFiles to roll back (without penalty).") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder() + .name("FlowFile Rollback Yield Iterations") + .description("Number of FlowFiles to roll back and yield.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder() + .name("FlowFile Rollback Penalty Iterations") + .description("Number of FlowFiles to roll back with penalty.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder() + .name("FlowFile Exception Iterations") + .description("Number of FlowFiles to throw exception.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder() + .name("FlowFile Exception Class") + .description("Exception class to be thrown (must extend java.lang.RuntimeException).") + .required(true) + .defaultValue("java.lang.RuntimeException") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + Class klass = classNameToRuntimeExceptionClass(input); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass))) + .explanation(subject + " class must exist and extend java.lang.RuntimeException") + .build(); + } + }) + .build(); + + static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder() + .name("No FlowFile Skip Iterations") + .description("Number of times to skip onTrigger if no FlowFile.") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder() + .name("No FlowFile Exception Iterations") + .description("Number of times to throw NPE exception if no FlowFile.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder() + .name("No FlowFile Yield Iterations") + .description("Number of times to yield if no FlowFile.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder() + .name("No FlowFile Exception Class") + .description("Exception class to be thrown if no FlowFile (must extend java.lang.RuntimeException).") + .required(true) + .defaultValue("java.lang.RuntimeException") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + Class klass = classNameToRuntimeExceptionClass(input); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(klass != null && (RuntimeException.class.isAssignableFrom(klass))) + .explanation(subject + " class must exist and extend java.lang.RuntimeException") + .build(); + } + }) + .build(); + + private volatile Integer flowFileMaxSuccess = 0; + private volatile Integer flowFileMaxFailure = 0; + private volatile Integer flowFileMaxRollback = 0; + private volatile Integer flowFileMaxYield = 0; + private volatile Integer flowFileMaxPenalty = 0; + private volatile Integer flowFileMaxException = 0; + + private volatile Integer noFlowFileMaxSkip = 0; + private volatile Integer noFlowFileMaxException = 0; + private volatile Integer noFlowFileMaxYield = 0; + + private volatile Integer flowFileCurrSuccess = 0; + private volatile Integer flowFileCurrFailure = 0; + private volatile Integer flowFileCurrRollback = 0; + private volatile Integer flowFileCurrYield = 0; + private volatile Integer flowFileCurrPenalty = 0; + private volatile Integer flowFileCurrException = 0; + + private volatile Integer noFlowFileCurrSkip = 0; + private volatile Integer noFlowFileCurrException = 0; + private volatile Integer noFlowFileCurrYield = 0; + + private volatile Class flowFileExceptionClass = null; + private volatile Class noFlowFileExceptionClass= null; + + private final FlowFileResponse curr_ff_resp = new FlowFileResponse(); + private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse(); + + @Override + public Set getRelationships() { + synchronized (relationships) { + if (relationships.get() == null) { + HashSet relSet = new HashSet<>(); + relSet.add(REL_SUCCESS); + relSet.add(REL_FAILURE); + relationships.compareAndSet(null, Collections.unmodifiableSet(relSet)); + } + return relationships.get(); + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + synchronized (propertyDescriptors) { + if (propertyDescriptors.get() == null) { + ArrayList propList = new ArrayList<>(); + propList.add(FF_SUCCESS_ITERATIONS); + propList.add(FF_FAILURE_ITERATIONS); + propList.add(FF_ROLLBACK_ITERATIONS); + propList.add(FF_ROLLBACK_YIELD_ITERATIONS); + propList.add(FF_ROLLBACK_PENALTY_ITERATIONS); + propList.add(FF_EXCEPTION_ITERATIONS); + propList.add(FF_EXCEPTION_CLASS); + propList.add(NO_FF_SKIP_ITERATIONS); + propList.add(NO_FF_EXCEPTION_ITERATIONS); + propList.add(NO_FF_YIELD_ITERATIONS); + propList.add(NO_FF_EXCEPTION_CLASS); + propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList)); + } + return propertyDescriptors.get(); + } + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + flowFileMaxSuccess = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger(); + flowFileMaxFailure = context.getProperty(FF_FAILURE_ITERATIONS).asInteger(); + flowFileMaxYield = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger(); + flowFileMaxRollback = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger(); + flowFileMaxPenalty = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger(); + flowFileMaxException = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger(); + noFlowFileMaxException = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger(); + noFlowFileMaxYield = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger(); + noFlowFileMaxSkip = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger(); + curr_ff_resp.reset(); + curr_noff_resp.reset(); + flowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString()); + noFlowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString()); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + + FlowFile ff = session.get(); + + // Make up to 2 passes to allow rollover from last cycle to first. + // (This could be "while(true)" since responses should break out if selected, but this + // prevents endless loops in the event of unexpected errors or future changes.) + int pass = 2; + while (pass > 0) { + pass -= 1; + if (ff == null) { + if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) { + if (noFlowFileCurrSkip < noFlowFileMaxSkip) { + noFlowFileCurrSkip += 1; + logger.info("DebugFlow skipping with no flow file"); + return; + } else { + noFlowFileCurrSkip = 0; + curr_noff_resp.getNextCycle(); + } + } + if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) { + if (noFlowFileCurrException < noFlowFileMaxException) { + noFlowFileCurrException += 1; + logger.info("DebugFlow throwing NPE with no flow file"); + String message = "forced by " + this.getClass().getName(); + RuntimeException rte; + try { + rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message); + throw rte; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + if (logger.isErrorEnabled()) { + logger.error("{} unexpected exception throwing DebugFlow exception: {}", + new Object[]{this, e}); + } + } + } else { + noFlowFileCurrException = 0; + curr_noff_resp.getNextCycle(); + } + } + if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) { + if (noFlowFileCurrYield < noFlowFileMaxYield) { + noFlowFileCurrYield += 1; + logger.info("DebugFlow yielding with no flow file"); + context.yield(); + break; + } else { + noFlowFileCurrYield = 0; + curr_noff_resp.getNextCycle(); + } + } + return; + } else { + if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) { + if (flowFileCurrSuccess < flowFileMaxSuccess) { + flowFileCurrSuccess += 1; + logger.info("DebugFlow transferring to success file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.transfer(ff, REL_SUCCESS); + session.commit(); + break; + } else { + flowFileCurrSuccess = 0; + curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp.state() == FlowFileResponseState.FF_FAILURE_RESPONSE) { + if (flowFileCurrFailure < flowFileMaxFailure) { + flowFileCurrFailure += 1; + logger.info("DebugFlow transferring to failure file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.transfer(ff, REL_FAILURE); + session.commit(); + break; + } else { + flowFileCurrFailure = 0; + curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp.state() == FlowFileResponseState.FF_ROLLBACK_RESPONSE) { + if (flowFileCurrRollback < flowFileMaxRollback) { + flowFileCurrRollback += 1; + logger.info("DebugFlow rolling back (no penalty) file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(); + session.commit(); + break; + } else { + flowFileCurrRollback = 0; + curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp.state() == FlowFileResponseState.FF_YIELD_RESPONSE) { + if (flowFileCurrYield < flowFileMaxYield) { + flowFileCurrYield += 1; + logger.info("DebugFlow yielding file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(); + context.yield(); + return; + } else { + flowFileCurrYield = 0; + curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp.state() == FlowFileResponseState.FF_PENALTY_RESPONSE) { + if (flowFileCurrPenalty < flowFileMaxPenalty) { + flowFileCurrPenalty += 1; + logger.info("DebugFlow rolling back (with penalty) file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + session.rollback(true); + session.commit(); + break; + } else { + flowFileCurrPenalty = 0; + curr_ff_resp.getNextCycle(); + } + } + if (curr_ff_resp.state() == FlowFileResponseState.FF_EXCEPTION_RESPONSE) { + if (flowFileCurrException < flowFileMaxException) { + flowFileCurrException += 1; + String message = "forced by " + this.getClass().getName(); + logger.info("DebugFlow throwing NPE file={} UUID={}", + new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()), + ff.getAttribute(CoreAttributes.UUID.key())}); + RuntimeException rte; + try { + rte = flowFileExceptionClass.getConstructor(String.class).newInstance(message); + throw rte; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + if (logger.isErrorEnabled()) { + logger.error("{} unexpected exception throwing DebugFlow exception: {}", + new Object[]{this, e}); + } + } + } else { + flowFileCurrException = 0; + curr_ff_resp.getNextCycle(); + } + } + } + } + } + + private static Class classNameToRuntimeExceptionClass(String name) { + Class klass = null; + try { + Class klass2 = Class.forName(name); + if (klass2 == RuntimeException.class || RuntimeException.class.isAssignableFrom(klass2)) { + //noinspection unchecked + klass = (Class)klass2; + } + } catch (ClassNotFoundException e) { + klass = null; + } + return klass; + } + + private enum FlowFileResponseState { + FF_SUCCESS_RESPONSE, + FF_FAILURE_RESPONSE, + FF_ROLLBACK_RESPONSE, + FF_YIELD_RESPONSE, + FF_PENALTY_RESPONSE, + FF_EXCEPTION_RESPONSE; + + private FlowFileResponseState nextState; + static { + FF_SUCCESS_RESPONSE.nextState = FF_FAILURE_RESPONSE; + FF_FAILURE_RESPONSE.nextState = FF_ROLLBACK_RESPONSE; + FF_ROLLBACK_RESPONSE.nextState = FF_YIELD_RESPONSE; + FF_YIELD_RESPONSE.nextState = FF_PENALTY_RESPONSE; + FF_PENALTY_RESPONSE.nextState = FF_EXCEPTION_RESPONSE; + FF_EXCEPTION_RESPONSE.nextState = FF_SUCCESS_RESPONSE; + } + FlowFileResponseState next() { + return nextState; + } + } + + private class FlowFileResponse { + private final AtomicReference current = new AtomicReference<>(); + FlowFileResponse() { + current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE); + } + synchronized FlowFileResponseState state() { + return current.get(); + } + synchronized void getNextCycle() { + current.set(current.get().next()); + } + synchronized void reset() { + current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE); + } + } + + private enum NoFlowFileResponseState { + NO_FF_SKIP_RESPONSE, + NO_FF_EXCEPTION_RESPONSE, + NO_FF_YIELD_RESPONSE; + + private NoFlowFileResponseState nextState; + static { + NO_FF_SKIP_RESPONSE.nextState = NO_FF_EXCEPTION_RESPONSE; + NO_FF_EXCEPTION_RESPONSE.nextState = NO_FF_YIELD_RESPONSE; + NO_FF_YIELD_RESPONSE.nextState = NO_FF_SKIP_RESPONSE; + } + NoFlowFileResponseState next() { + return nextState; + } + } + + private class NoFlowFileResponse { + private final AtomicReference current = new AtomicReference<>(); + NoFlowFileResponse() { + current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE); + } + synchronized NoFlowFileResponseState state() { + return current.get(); + } + synchronized void getNextCycle() { + current.set(current.get().next()); + } + synchronized void reset() { + current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a50fd8632c..bf9507881f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet org.apache.nifi.processors.standard.ConvertJSONToSQL +org.apache.nifi.processors.standard.DebugFlow org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad org.apache.nifi.processors.standard.DuplicateFlowFile diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html new file mode 100644 index 0000000000..f771d54119 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DebugFlow/additionalDetails.html @@ -0,0 +1,48 @@ + + + + + + DebugFlow + + + + + +

+ When triggered, the processor loops through the appropriate response list (based on whether or not it + received a FlowFile). A response is produced the configured number of times for each pass through its + response list, as long as the processor is running. +

+ Triggered by a FlowFile, the processor can produce the following responses. +

    +
  1. transfer FlowFile to success relationship.
  2. +
  3. transfer FlowFile to failure relationship.
  4. +
  5. rollback the FlowFile without penalty.
  6. +
  7. rollback the FlowFile and yield the context.
  8. +
  9. rollback the FlowFile with penalty.
  10. +
  11. throw an exception.
  12. +
+

+ Triggered without a FlowFile, the processor can produce the following responses. +

    +
  1. do nothing and return.
  2. +
  3. throw an exception.
  4. +
  5. yield the context.
  6. +
+

+ + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java new file mode 100644 index 0000000000..8eb53aa406 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestDebugFlow { + + private DebugFlow debugFlow; + private TestRunner runner; + private ProcessSession session; + + private final Map contents = new HashMap<>(); + private final Map> attribs = new HashMap<>(); + private Map namesToContent = new HashMap<>(); + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Before + public void setup() throws IOException { + for (int n = 0; n < 6; n++) { + String filename = "testFile" + (n + 1) + ".txt"; + String content = "Hello World " + (n + 1) + "!"; + contents.put(n, content); + attribs.put(n, new HashMap()); + attribs.get(n).put(CoreAttributes.FILENAME.key(), filename); + attribs.get(n).put(CoreAttributes.UUID.key(), "TESTING-FILE-" + (n + 1) + "-TESTING"); + namesToContent.put(filename, content); + } + + debugFlow = new DebugFlow(); + runner = TestRunners.newTestRunner(debugFlow); + session = runner.getProcessSessionFactory().createSession(); + + runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "0"); + runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "0"); + runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "0"); + runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "0"); + runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "0"); + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "0"); + + runner.setProperty(DebugFlow.NO_FF_SKIP_ITERATIONS, "0"); + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "0"); + runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0"); + } + + @Test + public void testGetSupportedPropertyDescriptors() throws Exception { + assertEquals(11, debugFlow.getPropertyDescriptors().size()); + } + + @Test + public void testGetRelationships() throws Exception { + assertEquals(2, debugFlow.getRelationships().size()); + } + + private boolean isInContents(byte[] content) { + for (Map.Entry entry : contents.entrySet()) { + if (((String)entry.getValue()).compareTo(new String(content)) == 0) { + return true; + } + } + return false; + } + + @Test + public void testFlowFileSuccess() { + runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(7); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 6); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 0); + + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).toByteArray())); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(1).toByteArray())); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(2).toByteArray())); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(3).toByteArray())); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(4).toByteArray())); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(5).toByteArray())); + } + + @Test + public void testFlowFileFailure() { + runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(7); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 6); + + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).assertContentEquals(contents.get(0)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(1).assertContentEquals(contents.get(1)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(2).assertContentEquals(contents.get(2)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(3).assertContentEquals(contents.get(3)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(4).assertContentEquals(contents.get(4)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(5).assertContentEquals(contents.get(5)); + } + + @Test + public void testFlowFileSuccessAndFailure() { + runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(7); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 3); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 3); + + runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).assertContentEquals(contents.get(0)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).assertContentEquals(contents.get(1)); + runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(1).assertContentEquals(contents.get(2)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(1).assertContentEquals(contents.get(3)); + runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(2).assertContentEquals(contents.get(4)); + runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(2).assertContentEquals(contents.get(5)); + } + + @Test + public void testFlowFileRollback() throws IOException { + runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(7); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 0); + + runner.assertQueueNotEmpty(); + assertEquals(6, runner.getQueueSize().getObjectCount()); + + MockFlowFile ff1 = (MockFlowFile) session.get(); + assertNotNull(ff1); + assertEquals(namesToContent.get(ff1.getAttribute(CoreAttributes.FILENAME.key())), new String(ff1.toByteArray())); + session.rollback(); + } + + @Test + public void testFlowFileRollbackYield() { + runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(7); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 0); + + runner.assertQueueNotEmpty(); + assertEquals(6, runner.getQueueSize().getObjectCount()); + } + + @Test + public void testFlowFileRollbackPenalty() { + runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(7); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 0); + + runner.assertQueueNotEmpty(); + assertEquals(6, runner.getQueueSize().getObjectCount()); + } + + @Test + public void testFlowFileDefaultException() { + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1"); + runner.assertValid(); + + runner.enqueue(contents.get(0).getBytes(), attribs.get(0)); + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(RuntimeException.class)); + runner.run(2); + } + + @Test + public void testFlowFileNonDefaultException() { + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, "java.lang.RuntimeException"); + runner.assertValid(); + + runner.enqueue(contents.get(0).getBytes(), attribs.get(0)); + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(RuntimeException.class)); + runner.run(2); + } + + @Test + public void testFlowFileNPEException() { + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, "java.lang.NullPointerException"); + runner.assertValid(); + + runner.enqueue(contents.get(0).getBytes(), attribs.get(0)); + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + runner.run(2); + } + + @Test + public void testFlowFileBadException() { + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, "java.lang.NonExistantException"); + runner.assertNotValid(); + } + + @Test + public void testFlowFileExceptionRollover() { + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "2"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(RuntimeException.class)); + runner.run(8); + } + + @Test + public void testFlowFileAll() { + runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "1"); + runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1"); + runner.assertValid(); + + for (int n = 0; n < 6; n++) { + runner.enqueue(contents.get(n).getBytes(), attribs.get(n)); + } + + runner.run(5); + runner.assertTransferCount(DebugFlow.REL_SUCCESS, 1); + runner.assertTransferCount(DebugFlow.REL_FAILURE, 1); + + assertEquals(4, runner.getQueueSize().getObjectCount()); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).toByteArray())); + assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).toByteArray())); + + runner.run(2); + } + + @Test + public void testNoFlowFileZeroIterations() { + runner.run(4); + } + + @Test + public void testNoFlowFileSkip() { + runner.setProperty(DebugFlow.NO_FF_SKIP_ITERATIONS, "1"); + runner.assertValid(); + + runner.run(4); + } + + @Test + public void testNoFlowFileDefaultException() { + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1"); + runner.assertValid(); + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(RuntimeException.class)); + runner.run(3); + } + + @Test + public void testNoFlowFileNonDefaultException() { + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1"); + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, "java.lang.RuntimeException"); + runner.assertValid(); + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(RuntimeException.class)); + runner.run(3); + } + + @Test + public void testNoFlowFileOtherException() { + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1"); + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, "java.lang.NullPointerException"); + runner.assertValid(); + + exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow")); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + runner.run(3); + } + + @Test + public void testNoFlowFileBadException() { + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1"); + runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, "java.lang.NonExistantException"); + runner.assertNotValid(); + } + + @Test + public void testNoFlowFileYield() { + runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "1"); + runner.assertValid(); + + runner.run(4); + } +}