From b7b1dc2fe6f2ce4d08d3e6790034c9418e40221d Mon Sep 17 00:00:00 2001 From: Mike Moser Date: Wed, 3 Aug 2016 15:58:57 -0400 Subject: [PATCH] NIFI-2471 fix Hadoop configuration resources when talking to multiple Hadoop clusters This closes #779. --- .../nifi/processors/hadoop/AbstractHadoopProcessor.java | 8 +++++--- .../java/org/apache/nifi/processors/hadoop/ListHDFS.java | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 6288de3294..540c406bfd 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -45,6 +45,7 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StringUtils; import javax.net.SocketFactory; + import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -178,8 +179,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } /* - * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) - */ + * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) + */ @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { try { @@ -264,6 +265,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete // restart String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); + config.set(disableCacheName, "true"); // If kerberos is enabled, create the file system as the kerberos principal // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time @@ -283,7 +285,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { fs = getFileSystemAsUser(config, ugi); } } - config.set(disableCacheName, "true"); + getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); return new HdfsResources(config, fs, ugi); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 524bf64fdf..6d9f8f7503 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -159,6 +159,7 @@ public class ListHDFS extends AbstractHadoopProcessor { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) { latestTimestampEmitted = -1L; latestTimestampListed = -1L;