mirror of https://github.com/apache/nifi.git
NIFI-1829 - Create new DebugFlow processor.
Signed-off-by: Mike Moser <mosermw@apache.org> This closes #458
This commit is contained in:
parent
6e74c10f49
commit
4723f8e24c
|
@ -0,0 +1,486 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.http.annotation.ThreadSafe;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ThreadSafe()
|
||||
@EventDriven()
|
||||
@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
|
||||
@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
|
||||
+ "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
|
||||
+ "FlowFile if using timer or cron based scheduling. It can force responses needed to exercise or test "
|
||||
+ "various failure modes that can occur when a processor runs.")
|
||||
public class DebugFlow extends AbstractProcessor {
|
||||
|
||||
private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
|
||||
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFiles processed successfully.")
|
||||
.build();
|
||||
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFiles that failed to process.")
|
||||
.build();
|
||||
|
||||
private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
|
||||
|
||||
static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Success Iterations")
|
||||
.description("Number of FlowFiles to forward to success relationship.")
|
||||
.required(true)
|
||||
.defaultValue("1")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Failure Iterations")
|
||||
.description("Number of FlowFiles to forward to failure relationship.")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Rollback Iterations")
|
||||
.description("Number of FlowFiles to roll back (without penalty).")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Rollback Yield Iterations")
|
||||
.description("Number of FlowFiles to roll back and yield.")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Rollback Penalty Iterations")
|
||||
.description("Number of FlowFiles to roll back with penalty.")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Exception Iterations")
|
||||
.description("Number of FlowFiles to throw exception.")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
|
||||
.name("FlowFile Exception Class")
|
||||
.description("Exception class to be thrown (must extend java.lang.RuntimeException).")
|
||||
.required(true)
|
||||
.defaultValue("java.lang.RuntimeException")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||
Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
|
||||
return new ValidationResult.Builder()
|
||||
.subject(subject)
|
||||
.input(input)
|
||||
.valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
|
||||
.explanation(subject + " class must exist and extend java.lang.RuntimeException")
|
||||
.build();
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("No FlowFile Skip Iterations")
|
||||
.description("Number of times to skip onTrigger if no FlowFile.")
|
||||
.required(true)
|
||||
.defaultValue("1")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("No FlowFile Exception Iterations")
|
||||
.description("Number of times to throw NPE exception if no FlowFile.")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
|
||||
.name("No FlowFile Yield Iterations")
|
||||
.description("Number of times to yield if no FlowFile.")
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
|
||||
.name("No FlowFile Exception Class")
|
||||
.description("Exception class to be thrown if no FlowFile (must extend java.lang.RuntimeException).")
|
||||
.required(true)
|
||||
.defaultValue("java.lang.RuntimeException")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||
Class<? extends RuntimeException> klass = classNameToRuntimeExceptionClass(input);
|
||||
return new ValidationResult.Builder()
|
||||
.subject(subject)
|
||||
.input(input)
|
||||
.valid(klass != null && (RuntimeException.class.isAssignableFrom(klass)))
|
||||
.explanation(subject + " class must exist and extend java.lang.RuntimeException")
|
||||
.build();
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
private volatile Integer flowFileMaxSuccess = 0;
|
||||
private volatile Integer flowFileMaxFailure = 0;
|
||||
private volatile Integer flowFileMaxRollback = 0;
|
||||
private volatile Integer flowFileMaxYield = 0;
|
||||
private volatile Integer flowFileMaxPenalty = 0;
|
||||
private volatile Integer flowFileMaxException = 0;
|
||||
|
||||
private volatile Integer noFlowFileMaxSkip = 0;
|
||||
private volatile Integer noFlowFileMaxException = 0;
|
||||
private volatile Integer noFlowFileMaxYield = 0;
|
||||
|
||||
private volatile Integer flowFileCurrSuccess = 0;
|
||||
private volatile Integer flowFileCurrFailure = 0;
|
||||
private volatile Integer flowFileCurrRollback = 0;
|
||||
private volatile Integer flowFileCurrYield = 0;
|
||||
private volatile Integer flowFileCurrPenalty = 0;
|
||||
private volatile Integer flowFileCurrException = 0;
|
||||
|
||||
private volatile Integer noFlowFileCurrSkip = 0;
|
||||
private volatile Integer noFlowFileCurrException = 0;
|
||||
private volatile Integer noFlowFileCurrYield = 0;
|
||||
|
||||
private volatile Class<? extends RuntimeException> flowFileExceptionClass = null;
|
||||
private volatile Class<? extends RuntimeException> noFlowFileExceptionClass= null;
|
||||
|
||||
private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
|
||||
private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse();
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
synchronized (relationships) {
|
||||
if (relationships.get() == null) {
|
||||
HashSet<Relationship> relSet = new HashSet<>();
|
||||
relSet.add(REL_SUCCESS);
|
||||
relSet.add(REL_FAILURE);
|
||||
relationships.compareAndSet(null, Collections.unmodifiableSet(relSet));
|
||||
}
|
||||
return relationships.get();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
synchronized (propertyDescriptors) {
|
||||
if (propertyDescriptors.get() == null) {
|
||||
ArrayList<PropertyDescriptor> propList = new ArrayList<>();
|
||||
propList.add(FF_SUCCESS_ITERATIONS);
|
||||
propList.add(FF_FAILURE_ITERATIONS);
|
||||
propList.add(FF_ROLLBACK_ITERATIONS);
|
||||
propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
|
||||
propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
|
||||
propList.add(FF_EXCEPTION_ITERATIONS);
|
||||
propList.add(FF_EXCEPTION_CLASS);
|
||||
propList.add(NO_FF_SKIP_ITERATIONS);
|
||||
propList.add(NO_FF_EXCEPTION_ITERATIONS);
|
||||
propList.add(NO_FF_YIELD_ITERATIONS);
|
||||
propList.add(NO_FF_EXCEPTION_CLASS);
|
||||
propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList));
|
||||
}
|
||||
return propertyDescriptors.get();
|
||||
}
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
flowFileMaxSuccess = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
|
||||
flowFileMaxFailure = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
|
||||
flowFileMaxYield = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
|
||||
flowFileMaxRollback = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
|
||||
flowFileMaxPenalty = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
|
||||
flowFileMaxException = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
|
||||
noFlowFileMaxException = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
|
||||
noFlowFileMaxYield = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
|
||||
noFlowFileMaxSkip = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
|
||||
curr_ff_resp.reset();
|
||||
curr_noff_resp.reset();
|
||||
flowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(FF_EXCEPTION_CLASS).toString());
|
||||
noFlowFileExceptionClass = classNameToRuntimeExceptionClass(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
final ComponentLog logger = getLogger();
|
||||
|
||||
FlowFile ff = session.get();
|
||||
|
||||
// Make up to 2 passes to allow rollover from last cycle to first.
|
||||
// (This could be "while(true)" since responses should break out if selected, but this
|
||||
// prevents endless loops in the event of unexpected errors or future changes.)
|
||||
int pass = 2;
|
||||
while (pass > 0) {
|
||||
pass -= 1;
|
||||
if (ff == null) {
|
||||
if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
|
||||
if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
|
||||
noFlowFileCurrSkip += 1;
|
||||
logger.info("DebugFlow skipping with no flow file");
|
||||
return;
|
||||
} else {
|
||||
noFlowFileCurrSkip = 0;
|
||||
curr_noff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
|
||||
if (noFlowFileCurrException < noFlowFileMaxException) {
|
||||
noFlowFileCurrException += 1;
|
||||
logger.info("DebugFlow throwing NPE with no flow file");
|
||||
String message = "forced by " + this.getClass().getName();
|
||||
RuntimeException rte;
|
||||
try {
|
||||
rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
|
||||
throw rte;
|
||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("{} unexpected exception throwing DebugFlow exception: {}",
|
||||
new Object[]{this, e});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
noFlowFileCurrException = 0;
|
||||
curr_noff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) {
|
||||
if (noFlowFileCurrYield < noFlowFileMaxYield) {
|
||||
noFlowFileCurrYield += 1;
|
||||
logger.info("DebugFlow yielding with no flow file");
|
||||
context.yield();
|
||||
break;
|
||||
} else {
|
||||
noFlowFileCurrYield = 0;
|
||||
curr_noff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) {
|
||||
if (flowFileCurrSuccess < flowFileMaxSuccess) {
|
||||
flowFileCurrSuccess += 1;
|
||||
logger.info("DebugFlow transferring to success file={} UUID={}",
|
||||
new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
|
||||
ff.getAttribute(CoreAttributes.UUID.key())});
|
||||
session.transfer(ff, REL_SUCCESS);
|
||||
session.commit();
|
||||
break;
|
||||
} else {
|
||||
flowFileCurrSuccess = 0;
|
||||
curr_ff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_ff_resp.state() == FlowFileResponseState.FF_FAILURE_RESPONSE) {
|
||||
if (flowFileCurrFailure < flowFileMaxFailure) {
|
||||
flowFileCurrFailure += 1;
|
||||
logger.info("DebugFlow transferring to failure file={} UUID={}",
|
||||
new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
|
||||
ff.getAttribute(CoreAttributes.UUID.key())});
|
||||
session.transfer(ff, REL_FAILURE);
|
||||
session.commit();
|
||||
break;
|
||||
} else {
|
||||
flowFileCurrFailure = 0;
|
||||
curr_ff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_ff_resp.state() == FlowFileResponseState.FF_ROLLBACK_RESPONSE) {
|
||||
if (flowFileCurrRollback < flowFileMaxRollback) {
|
||||
flowFileCurrRollback += 1;
|
||||
logger.info("DebugFlow rolling back (no penalty) file={} UUID={}",
|
||||
new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
|
||||
ff.getAttribute(CoreAttributes.UUID.key())});
|
||||
session.rollback();
|
||||
session.commit();
|
||||
break;
|
||||
} else {
|
||||
flowFileCurrRollback = 0;
|
||||
curr_ff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_ff_resp.state() == FlowFileResponseState.FF_YIELD_RESPONSE) {
|
||||
if (flowFileCurrYield < flowFileMaxYield) {
|
||||
flowFileCurrYield += 1;
|
||||
logger.info("DebugFlow yielding file={} UUID={}",
|
||||
new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
|
||||
ff.getAttribute(CoreAttributes.UUID.key())});
|
||||
session.rollback();
|
||||
context.yield();
|
||||
return;
|
||||
} else {
|
||||
flowFileCurrYield = 0;
|
||||
curr_ff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_ff_resp.state() == FlowFileResponseState.FF_PENALTY_RESPONSE) {
|
||||
if (flowFileCurrPenalty < flowFileMaxPenalty) {
|
||||
flowFileCurrPenalty += 1;
|
||||
logger.info("DebugFlow rolling back (with penalty) file={} UUID={}",
|
||||
new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
|
||||
ff.getAttribute(CoreAttributes.UUID.key())});
|
||||
session.rollback(true);
|
||||
session.commit();
|
||||
break;
|
||||
} else {
|
||||
flowFileCurrPenalty = 0;
|
||||
curr_ff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
if (curr_ff_resp.state() == FlowFileResponseState.FF_EXCEPTION_RESPONSE) {
|
||||
if (flowFileCurrException < flowFileMaxException) {
|
||||
flowFileCurrException += 1;
|
||||
String message = "forced by " + this.getClass().getName();
|
||||
logger.info("DebugFlow throwing NPE file={} UUID={}",
|
||||
new Object[]{ff.getAttribute(CoreAttributes.FILENAME.key()),
|
||||
ff.getAttribute(CoreAttributes.UUID.key())});
|
||||
RuntimeException rte;
|
||||
try {
|
||||
rte = flowFileExceptionClass.getConstructor(String.class).newInstance(message);
|
||||
throw rte;
|
||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
|
||||
if (logger.isErrorEnabled()) {
|
||||
logger.error("{} unexpected exception throwing DebugFlow exception: {}",
|
||||
new Object[]{this, e});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
flowFileCurrException = 0;
|
||||
curr_ff_resp.getNextCycle();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static Class<? extends RuntimeException> classNameToRuntimeExceptionClass(String name) {
|
||||
Class<? extends RuntimeException> klass = null;
|
||||
try {
|
||||
Class<?> klass2 = Class.forName(name);
|
||||
if (klass2 == RuntimeException.class || RuntimeException.class.isAssignableFrom(klass2)) {
|
||||
//noinspection unchecked
|
||||
klass = (Class<? extends RuntimeException>)klass2;
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
klass = null;
|
||||
}
|
||||
return klass;
|
||||
}
|
||||
|
||||
private enum FlowFileResponseState {
|
||||
FF_SUCCESS_RESPONSE,
|
||||
FF_FAILURE_RESPONSE,
|
||||
FF_ROLLBACK_RESPONSE,
|
||||
FF_YIELD_RESPONSE,
|
||||
FF_PENALTY_RESPONSE,
|
||||
FF_EXCEPTION_RESPONSE;
|
||||
|
||||
private FlowFileResponseState nextState;
|
||||
static {
|
||||
FF_SUCCESS_RESPONSE.nextState = FF_FAILURE_RESPONSE;
|
||||
FF_FAILURE_RESPONSE.nextState = FF_ROLLBACK_RESPONSE;
|
||||
FF_ROLLBACK_RESPONSE.nextState = FF_YIELD_RESPONSE;
|
||||
FF_YIELD_RESPONSE.nextState = FF_PENALTY_RESPONSE;
|
||||
FF_PENALTY_RESPONSE.nextState = FF_EXCEPTION_RESPONSE;
|
||||
FF_EXCEPTION_RESPONSE.nextState = FF_SUCCESS_RESPONSE;
|
||||
}
|
||||
FlowFileResponseState next() {
|
||||
return nextState;
|
||||
}
|
||||
}
|
||||
|
||||
private class FlowFileResponse {
|
||||
private final AtomicReference<FlowFileResponseState> current = new AtomicReference<>();
|
||||
FlowFileResponse() {
|
||||
current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE);
|
||||
}
|
||||
synchronized FlowFileResponseState state() {
|
||||
return current.get();
|
||||
}
|
||||
synchronized void getNextCycle() {
|
||||
current.set(current.get().next());
|
||||
}
|
||||
synchronized void reset() {
|
||||
current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE);
|
||||
}
|
||||
}
|
||||
|
||||
private enum NoFlowFileResponseState {
|
||||
NO_FF_SKIP_RESPONSE,
|
||||
NO_FF_EXCEPTION_RESPONSE,
|
||||
NO_FF_YIELD_RESPONSE;
|
||||
|
||||
private NoFlowFileResponseState nextState;
|
||||
static {
|
||||
NO_FF_SKIP_RESPONSE.nextState = NO_FF_EXCEPTION_RESPONSE;
|
||||
NO_FF_EXCEPTION_RESPONSE.nextState = NO_FF_YIELD_RESPONSE;
|
||||
NO_FF_YIELD_RESPONSE.nextState = NO_FF_SKIP_RESPONSE;
|
||||
}
|
||||
NoFlowFileResponseState next() {
|
||||
return nextState;
|
||||
}
|
||||
}
|
||||
|
||||
private class NoFlowFileResponse {
|
||||
private final AtomicReference<NoFlowFileResponseState> current = new AtomicReference<>();
|
||||
NoFlowFileResponse() {
|
||||
current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE);
|
||||
}
|
||||
synchronized NoFlowFileResponseState state() {
|
||||
return current.get();
|
||||
}
|
||||
synchronized void getNextCycle() {
|
||||
current.set(current.get().next());
|
||||
}
|
||||
synchronized void reset() {
|
||||
current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.CompressContent
|
|||
org.apache.nifi.processors.standard.ControlRate
|
||||
org.apache.nifi.processors.standard.ConvertCharacterSet
|
||||
org.apache.nifi.processors.standard.ConvertJSONToSQL
|
||||
org.apache.nifi.processors.standard.DebugFlow
|
||||
org.apache.nifi.processors.standard.DetectDuplicate
|
||||
org.apache.nifi.processors.standard.DistributeLoad
|
||||
org.apache.nifi.processors.standard.DuplicateFlowFile
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8"/>
|
||||
<title>DebugFlow</title>
|
||||
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<!-- Processor Documentation ================================================== -->
|
||||
<p>
|
||||
When triggered, the processor loops through the appropriate response list (based on whether or not it
|
||||
received a FlowFile). A response is produced the configured number of times for each pass through its
|
||||
response list, as long as the processor is running.
|
||||
</p><p>
|
||||
Triggered by a FlowFile, the processor can produce the following responses.
|
||||
<ol>
|
||||
<li>transfer FlowFile to success relationship.</li>
|
||||
<li>transfer FlowFile to failure relationship.</li>
|
||||
<li>rollback the FlowFile without penalty.</li>
|
||||
<li>rollback the FlowFile and yield the context.</li>
|
||||
<li>rollback the FlowFile with penalty.</li>
|
||||
<li>throw an exception.</li>
|
||||
</ol>
|
||||
</p><p>
|
||||
Triggered without a FlowFile, the processor can produce the following responses.
|
||||
<ol>
|
||||
<li>do nothing and return.</li>
|
||||
<li>throw an exception.</li>
|
||||
<li>yield the context.</li>
|
||||
</ol>
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,361 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestDebugFlow {
|
||||
|
||||
private DebugFlow debugFlow;
|
||||
private TestRunner runner;
|
||||
private ProcessSession session;
|
||||
|
||||
private final Map<Integer, String> contents = new HashMap<>();
|
||||
private final Map<Integer, Map<String, String>> attribs = new HashMap<>();
|
||||
private Map<String, String> namesToContent = new HashMap<>();
|
||||
|
||||
@Rule
|
||||
public final ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
for (int n = 0; n < 6; n++) {
|
||||
String filename = "testFile" + (n + 1) + ".txt";
|
||||
String content = "Hello World " + (n + 1) + "!";
|
||||
contents.put(n, content);
|
||||
attribs.put(n, new HashMap<String, String>());
|
||||
attribs.get(n).put(CoreAttributes.FILENAME.key(), filename);
|
||||
attribs.get(n).put(CoreAttributes.UUID.key(), "TESTING-FILE-" + (n + 1) + "-TESTING");
|
||||
namesToContent.put(filename, content);
|
||||
}
|
||||
|
||||
debugFlow = new DebugFlow();
|
||||
runner = TestRunners.newTestRunner(debugFlow);
|
||||
session = runner.getProcessSessionFactory().createSession();
|
||||
|
||||
runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "0");
|
||||
|
||||
runner.setProperty(DebugFlow.NO_FF_SKIP_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "0");
|
||||
runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSupportedPropertyDescriptors() throws Exception {
|
||||
assertEquals(11, debugFlow.getPropertyDescriptors().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRelationships() throws Exception {
|
||||
assertEquals(2, debugFlow.getRelationships().size());
|
||||
}
|
||||
|
||||
private boolean isInContents(byte[] content) {
|
||||
for (Map.Entry entry : contents.entrySet()) {
|
||||
if (((String)entry.getValue()).compareTo(new String(content)) == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileSuccess() {
|
||||
runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(7);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 6);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
|
||||
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).toByteArray()));
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(1).toByteArray()));
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(2).toByteArray()));
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(3).toByteArray()));
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(4).toByteArray()));
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(5).toByteArray()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileFailure() {
|
||||
runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(7);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 6);
|
||||
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).assertContentEquals(contents.get(0));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(1).assertContentEquals(contents.get(1));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(2).assertContentEquals(contents.get(2));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(3).assertContentEquals(contents.get(3));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(4).assertContentEquals(contents.get(4));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(5).assertContentEquals(contents.get(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileSuccessAndFailure() {
|
||||
runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(7);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 3);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 3);
|
||||
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).assertContentEquals(contents.get(0));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).assertContentEquals(contents.get(1));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(1).assertContentEquals(contents.get(2));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(1).assertContentEquals(contents.get(3));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(2).assertContentEquals(contents.get(4));
|
||||
runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(2).assertContentEquals(contents.get(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileRollback() throws IOException {
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(7);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
|
||||
|
||||
runner.assertQueueNotEmpty();
|
||||
assertEquals(6, runner.getQueueSize().getObjectCount());
|
||||
|
||||
MockFlowFile ff1 = (MockFlowFile) session.get();
|
||||
assertNotNull(ff1);
|
||||
assertEquals(namesToContent.get(ff1.getAttribute(CoreAttributes.FILENAME.key())), new String(ff1.toByteArray()));
|
||||
session.rollback();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileRollbackYield() {
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(7);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
|
||||
|
||||
runner.assertQueueNotEmpty();
|
||||
assertEquals(6, runner.getQueueSize().getObjectCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileRollbackPenalty() {
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(7);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 0);
|
||||
|
||||
runner.assertQueueNotEmpty();
|
||||
assertEquals(6, runner.getQueueSize().getObjectCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileDefaultException() {
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(contents.get(0).getBytes(), attribs.get(0));
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(RuntimeException.class));
|
||||
runner.run(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileNonDefaultException() {
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, "java.lang.RuntimeException");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(contents.get(0).getBytes(), attribs.get(0));
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(RuntimeException.class));
|
||||
runner.run(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileNPEException() {
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, "java.lang.NullPointerException");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(contents.get(0).getBytes(), attribs.get(0));
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
|
||||
runner.run(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileBadException() {
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_CLASS, "java.lang.NonExistantException");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileExceptionRollover() {
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "2");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(RuntimeException.class));
|
||||
runner.run(8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileAll() {
|
||||
runner.setProperty(DebugFlow.FF_SUCCESS_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_FAILURE_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_YIELD_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_ROLLBACK_PENALTY_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
for (int n = 0; n < 6; n++) {
|
||||
runner.enqueue(contents.get(n).getBytes(), attribs.get(n));
|
||||
}
|
||||
|
||||
runner.run(5);
|
||||
runner.assertTransferCount(DebugFlow.REL_SUCCESS, 1);
|
||||
runner.assertTransferCount(DebugFlow.REL_FAILURE, 1);
|
||||
|
||||
assertEquals(4, runner.getQueueSize().getObjectCount());
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_SUCCESS).get(0).toByteArray()));
|
||||
assertTrue(isInContents(runner.getFlowFilesForRelationship(DebugFlow.REL_FAILURE).get(0).toByteArray()));
|
||||
|
||||
runner.run(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileZeroIterations() {
|
||||
runner.run(4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileSkip() {
|
||||
runner.setProperty(DebugFlow.NO_FF_SKIP_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
runner.run(4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileDefaultException() {
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(RuntimeException.class));
|
||||
runner.run(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileNonDefaultException() {
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, "java.lang.RuntimeException");
|
||||
runner.assertValid();
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(RuntimeException.class));
|
||||
runner.run(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileOtherException() {
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, "java.lang.NullPointerException");
|
||||
runner.assertValid();
|
||||
|
||||
exception.expectMessage(CoreMatchers.containsString("forced by org.apache.nifi.processors.standard.DebugFlow"));
|
||||
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
|
||||
runner.run(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileBadException() {
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_ITERATIONS, "1");
|
||||
runner.setProperty(DebugFlow.NO_FF_EXCEPTION_CLASS, "java.lang.NonExistantException");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFlowFileYield() {
|
||||
runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "1");
|
||||
runner.assertValid();
|
||||
|
||||
runner.run(4);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue