From fd35b8ffd7d4a66ef149c1bd1341dfee3ede80c7 Mon Sep 17 00:00:00 2001 From: Johnathan Gilday Date: Wed, 25 Nov 2015 13:34:06 -0500 Subject: [PATCH] Fixes NIFI-1220. This closes #133. MockProcessSession returns a new FlowFile from its `penalty` method instead of mutating then returning the given FlowFile Signed-off-by: joewitt --- .../apache/nifi/util/MockProcessSession.java | 6 +- .../nifi/util/TestMockProcessSession.java | 58 +++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 97ec9ca9c8..67cf16e7c7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -1010,8 +1010,10 @@ public class MockProcessSession implements ProcessSession { public MockFlowFile penalize(final FlowFile flowFile) { validateState(flowFile); final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; - mockFlowFile.setPenalized(); - return mockFlowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + newFlowFile.setPenalized(); + return newFlowFile; } public byte[] getContentAsByteArray(final MockFlowFile flowFile) { diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java new file mode 100644 index 0000000000..e72807217f --- /dev/null +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Test; + +import java.util.Collections; +import java.util.Set; + +public class TestMockProcessSession { + + @Test(expected = AssertionError.class) + public void testPenalizeFlowFileFromProcessor() { + TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run(); + } + + protected static class PoorlyBehavedProcessor extends AbstractProcessor { + + private static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .build(); + + private final Set relationships = Collections.singleton(REL_FAILURE); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext ctx, final ProcessSession session) throws ProcessException { + final FlowFile file = session.create(); + session.penalize(file); + session.transfer(file, REL_FAILURE); + } + + } +}