diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 24c033076e..2631840d2b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -64,7 +64,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @TriggerWhenEmpty -@InputRequirement(Requirement.INPUT_ALLOWED) +@InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.") @WritesAttributes({ @@ -85,12 +85,6 @@ public class GetHDFS extends AbstractHadoopProcessor { .description("All files retrieved from HDFS are transferred to this relationship") .build(); - public static final Relationship REL_PASSTHROUGH = new Relationship.Builder() - .name("passthrough") - .description( - "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship") - .build(); - // properties public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() .name(DIRECTORY_PROP_NAME) @@ -181,10 +175,7 @@ public class GetHDFS extends AbstractHadoopProcessor { private static final Set relationships; static { - final Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_PASSTHROUGH); - relationships = Collections.unmodifiableSet(rels); + relationships = Collections.singleton(REL_SUCCESS); } protected ProcessorConfiguration processorConfig; @@ -259,13 +250,8 @@ public class GetHDFS extends AbstractHadoopProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final List files = new ArrayList<>(batchSize); - List inputFlowFiles = session.get(10); - for (FlowFile ff : inputFlowFiles) { - session.transfer(ff, REL_PASSTHROUGH); - } // retrieve new file names from HDFS and place them into work queue if (filePathQueue.size() < MAX_WORKING_QUEUE_SIZE / 2) {