NIFI-2471 fix Hadoop configuration resources when talking to multiple Hadoop clusters

This closes #779.
This commit is contained in:
Mike Moser 2016-08-03 15:58:57 -04:00 committed by Mark Payne
parent bc6b22389b
commit b7b1dc2fe6
2 changed files with 6 additions and 3 deletions

View File

@ -45,6 +45,7 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StringUtils;
import javax.net.SocketFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -178,8 +179,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
/*
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
*/
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
*/
@OnScheduled
public final void abstractOnScheduled(ProcessContext context) throws IOException {
try {
@ -264,6 +265,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
// restart
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
@ -283,7 +285,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
fs = getFileSystemAsUser(config, ugi);
}
}
config.set(disableCacheName, "true");
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() });
return new HdfsResources(config, fs, ugi);

View File

@ -159,6 +159,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;