NIFI-2483: Removed 'passthrough' relationship

This closes #789.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-08-04 10:41:18 -04:00 committed by Bryan Bende
parent 7ba10a6dea
commit 0e0166cec5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 2 additions and 16 deletions

View File

@ -64,7 +64,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@TriggerWhenEmpty @TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_ALLOWED) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.") @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
@WritesAttributes({ @WritesAttributes({
@ -85,12 +85,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
.description("All files retrieved from HDFS are transferred to this relationship") .description("All files retrieved from HDFS are transferred to this relationship")
.build(); .build();
public static final Relationship REL_PASSTHROUGH = new Relationship.Builder()
.name("passthrough")
.description(
"If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship")
.build();
// properties // properties
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name(DIRECTORY_PROP_NAME) .name(DIRECTORY_PROP_NAME)
@ -181,10 +175,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
static { static {
final Set<Relationship> rels = new HashSet<>(); relationships = Collections.singleton(REL_SUCCESS);
rels.add(REL_SUCCESS);
rels.add(REL_PASSTHROUGH);
relationships = Collections.unmodifiableSet(rels);
} }
protected ProcessorConfiguration processorConfig; protected ProcessorConfiguration processorConfig;
@ -259,13 +250,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
int batchSize = context.getProperty(BATCH_SIZE).asInteger(); int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final List<Path> files = new ArrayList<>(batchSize); final List<Path> files = new ArrayList<>(batchSize);
List<FlowFile> inputFlowFiles = session.get(10);
for (FlowFile ff : inputFlowFiles) {
session.transfer(ff, REL_PASSTHROUGH);
}
// retrieve new file names from HDFS and place them into work queue // retrieve new file names from HDFS and place them into work queue
if (filePathQueue.size() < MAX_WORKING_QUEUE_SIZE / 2) { if (filePathQueue.size() < MAX_WORKING_QUEUE_SIZE / 2) {