NIFI-1152 NIFI-2117 Fixed standard session impl api adherance, mock session api adherance, corrected code and tests for script processors that had issues due to that process session bug

This commit is contained in:
joewitt 2016-07-12 23:16:19 -04:00 committed by Mark Payne
parent 3c49a93289
commit cfaacb1d5c
9 changed files with 93 additions and 34 deletions

View File

@ -62,6 +62,7 @@ public class MockProcessSession implements ProcessSession {
private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>();
private final List<MockFlowFile> penalized = new ArrayList<>();
private final Processor processor;
private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
@ -77,6 +78,7 @@ public class MockProcessSession implements ProcessSession {
private int removedCount = 0;
public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
this.processor = processor;
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
@ -650,6 +652,9 @@ public class MockProcessSession implements ProcessSession {
transfer(flowFile);
return;
}
if(!processor.getRelationships().contains(relationship)){
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
}
validateState(flowFile);
List<MockFlowFile> list = transferMap.get(relationship);
@ -668,6 +673,9 @@ public class MockProcessSession implements ProcessSession {
transfer(flowFiles);
return;
}
if(!processor.getRelationships().contains(relationship)){
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
}
for (final FlowFile flowFile : flowFiles) {
validateState(flowFile);

View File

@ -64,6 +64,27 @@ public class TestMockProcessSession {
}
}
@Test
public void testTransferUnknownRelationship() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
final Relationship fakeRel = new Relationship.Builder().name("FAKE").build();
try {
session.transfer(ff1, fakeRel);
Assert.fail("Should have thrown IllegalArgumentException");
} catch (final IllegalArgumentException ie) {
}
try {
session.transfer(Collections.singleton(ff1), fakeRel);
Assert.fail("Should have thrown IllegalArgumentException");
} catch (final IllegalArgumentException ie) {
}
}
protected static class PoorlyBehavedProcessor extends AbstractProcessor {
private static final Relationship REL_FAILURE = new Relationship.Builder()

View File

@ -1562,9 +1562,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
@ -1575,7 +1572,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
autoTerminated = true;
} else if (numDestinations == 0 && relationship == Relationship.SELF) {
selfRelationship = true;
} else if (numDestinations == 0) {
// the relationship specified is not known in this session/context
throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);
if (autoTerminated) {
removedCount += multiplier;
@ -1616,6 +1619,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
autoTerminated = true;
} else if (numDestinations == 0 && relationship == Relationship.SELF) {
selfRelationship = true;
} else if (numDestinations == 0) {
// the relationship specified is not known in this session/context
throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}
final int multiplier = Math.max(1, numDestinations);

View File

@ -97,6 +97,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
@After
public void cleanup() {
@ -187,11 +188,14 @@ public class TestStandardProcessSession {
final Relationship relationship = (Relationship) arguments[0];
if (relationship == Relationship.SELF) {
return Collections.emptySet();
} else {
} else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP) ){
return null;
}else {
return new HashSet<>(connList);
}
}
}).when(connectable).getConnections(Mockito.any(Relationship.class));
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
contentRepo = new MockContentRepository();
@ -1250,6 +1254,34 @@ public class TestStandardProcessSession {
session.commit();
}
@Test
public void testTransferUnknownRelationship() {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord1);
FlowFile ff1 = session.get();
ff1 = session.putAttribute(ff1, "index", "1");
try {
session.transfer(ff1, FAKE_RELATIONSHIP);
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException iae) {
}
try {
final Collection<FlowFile> collection = new HashSet<>();
collection.add(ff1);
session.transfer(collection, FAKE_RELATIONSHIP);
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException iae) {
}
}
private static class MockFlowFileRepository implements FlowFileRepository {
private boolean failOnUpdate = false;
private final AtomicLong idGenerator = new AtomicLong(0L);

View File

@ -16,21 +16,7 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
@ -213,6 +199,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
/**
* Performs common setup operations when the processor is scheduled to run. This method assumes the member
* variables associated with properties have been filled.
* @param numberOfScriptEngines number of engines to setup
*/
public void setup(int numberOfScriptEngines) {
@ -231,6 +218,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is
* called, and the configurator is saved for future calls.
*
* @param numberOfScriptEngines number of engines to setup
* @see org.apache.nifi.processors.script.ScriptEngineConfigurator
*/
protected void setupEngines(int numberOfScriptEngines) {
@ -316,6 +304,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* If the parameter is null or empty, this class's classloader is returned
*
* @param modules An array of URLs to add to the class loader
* @return ClassLoader for script engine
*/
protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) {
ClassLoader thisClassLoader = this.getClass().getClassLoader();

View File

@ -63,16 +63,14 @@ import org.apache.nifi.processor.util.StandardValidators;
public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private final AtomicReference<Processor> processor = new AtomicReference<>();
private final AtomicReference<Collection<ValidationResult>> validationResults =
new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());
private final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
private ScriptEngine scriptEngine = null;
/**
* Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
* processor has defined additional relationships, those will be added as well.
* Returns the valid relationships for this processor as supplied by the script itself.
*
* @return a Set of Relationships supported by this processor
*/
@ -82,7 +80,10 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
final Processor instance = processor.get();
if (instance != null) {
try {
relationships.addAll(instance.getRelationships());
final Set<Relationship> rels = instance.getRelationships();
if(rels != null && !rels.isEmpty()){
relationships.addAll(rels);
}
} catch (final Throwable t) {
final ComponentLog logger = getLogger();
final String message = "Unable to get relationships from scripted Processor: " + t;
@ -92,10 +93,6 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
logger.error(message, t);
}
}
} else {
// Return defaults for now
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
}
return Collections.unmodifiableSet(relationships);
}
@ -493,6 +490,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
}
@OnStopped
@Override
public void stop() {
super.stop();
processor.set(null);

View File

@ -58,8 +58,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
runner.assertAllFlowFilesTransferred("test", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("test");
result.get(0).assertAttributeEquals("from-content", "test content");
}
@ -166,8 +166,8 @@ public class TestInvokeGroovy extends BaseScriptTest {
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(InvokeScriptedProcessor.REL_FAILURE, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(InvokeScriptedProcessor.REL_FAILURE);
runner.assertAllFlowFilesTransferred("FAILURE", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("FAILURE");
assertFalse(result.isEmpty());
}
}

View File

@ -18,13 +18,18 @@ class testScriptRoutesToFailure implements Processor {
def ComponentLog log
def REL_FAILURE = new Relationship.Builder()
.name("FAILURE")
.description("A FAILURE relationship")
.build();
@Override
void initialize(ProcessorInitializationContext context) {
}
@Override
Set<Relationship> getRelationships() {
return [] as Set
return [REL_FAILURE] as Set
}
@Override
@ -32,7 +37,7 @@ class testScriptRoutesToFailure implements Processor {
def session = sessionFactory.createSession()
def flowFile = session.get()
if(!flowFile) return
session.transfer(flowFile, InvokeScriptedProcessor.REL_FAILURE)
session.transfer(flowFile, REL_FAILURE)
}
@Override

View File

@ -43,7 +43,7 @@ class GroovyProcessor implements Processor {
}
flowFile = session.putAttribute(flowFile, "from-content", "test content")
// transfer
session.transfer(flowFile, InvokeScriptedProcessor.REL_SUCCESS)
session.transfer(flowFile, REL_TEST)
session.commit()
}