NIFI-3003 Upgrading hadoop.version to 2.7.3 and fixing TDE issue with PutHDFS, ensuring clean up of instance class loaders, and adding classpath resource property to all HDFS processors

NIFI-3003 Addressing review feedback

NIFI-3003 added minor notice updates

This closes #1219
This commit is contained in:
Bryan Bende 2016-11-09 16:42:27 -05:00 committed by Oleg Zhurakousky
parent 3c694b641e
commit fe59b3415c
13 changed files with 319 additions and 203 deletions

View File

@ -914,18 +914,14 @@ The following binary components are provided under the Apache Software License v
Java Native Access
Copyright 2015 Java Native Access
(ASLv2) HTrace Core
(ASLv2) Apache HTrace Core
The following NOTICE information applies:
In addition, this product includes software dependencies. See
the accompanying LICENSE.txt for a listing of dependencies
that are NOT Apache licensed (with pointers to their licensing)
Copyright 2016 The Apache Software Foundation
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
(ASLv2) Groovy (org.codehaus.groovy:groovy:jar:2.4.5 - http://www.groovy-lang.org)
The following NOTICE information applies:
Groovy Language

View File

@ -26,6 +26,7 @@ import org.apache.nifi.documentation.mock.MockControllerServiceInitializationCon
import org.apache.nifi.documentation.mock.MockComponentLogger;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.InitializationException;
@ -53,6 +54,8 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia
final ComponentLog logger = new MockComponentLogger();
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, controllerService, logger, context);
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(component.getIdentifier());
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.documentation.mock.MockProcessorInitializationContext;
import org.apache.nifi.documentation.mock.MockComponentLogger;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
@ -52,6 +53,8 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer {
final ComponentLog logger = new MockComponentLogger();
final MockProcessContext context = new MockProcessContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, processor, logger, context);
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(component.getIdentifier());
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.documentation.mock.MockConfigurationContext;
import org.apache.nifi.documentation.mock.MockComponentLogger;
import org.apache.nifi.documentation.mock.MockReportingInitializationContext;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
@ -51,6 +52,8 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context);
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(component.getIdentifier());
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.DomUtils;
@ -489,6 +490,12 @@ public class FingerprintFactory {
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
} finally {
// The processor instance is only for fingerprinting so we can remove the InstanceClassLoader here
// since otherwise it will stick around in the map forever
if (processor != null) {
ExtensionManager.removeInstanceClassLoaderIfExists(processor.getIdentifier());
}
}
// properties

View File

@ -707,6 +707,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void removeProcessor(final ProcessorNode processor) {
boolean removed = false;
final String id = requireNonNull(processor).getIdentifier();
writeLock.lock();
try {
@ -756,9 +757,16 @@ public final class StandardProcessGroup implements ProcessGroup {
removeConnection(conn);
}
ExtensionManager.removeInstanceClassLoaderIfExists(id);
removed = true;
LOG.info("{} removed from flow", processor);
} finally {
if (removed) {
try {
ExtensionManager.removeInstanceClassLoaderIfExists(id);
} catch (Throwable t) {
}
}
writeLock.unlock();
}
}
@ -1850,6 +1858,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void removeControllerService(final ControllerServiceNode service) {
boolean removed = false;
writeLock.lock();
try {
final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier());
@ -1880,8 +1889,16 @@ public final class StandardProcessGroup implements ProcessGroup {
controllerServices.remove(service.getIdentifier());
flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
removed = true;
LOG.info("{} removed from {}", service, this);
} finally {
if (removed) {
try {
ExtensionManager.removeInstanceClassLoaderIfExists(service.getIdentifier());
} catch (Throwable t) {
}
}
writeLock.unlock();
}
}

View File

@ -200,9 +200,9 @@ public class ExtensionManager {
// InstanceClassLoader that has the NAR ClassLoader as a parent
if (requiresInstanceClassLoading.contains(classType) && (registeredClassLoader instanceof URLClassLoader)) {
final URLClassLoader registeredUrlClassLoader = (URLClassLoader) registeredClassLoader;
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent());
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent());
} else {
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, new URL[0], registeredClassLoader);
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, new URL[0], registeredClassLoader);
}
instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader);
@ -218,7 +218,11 @@ public class ExtensionManager {
* @return the removed ClassLoader for the given instance, or null if not found
*/
public static ClassLoader removeInstanceClassLoaderIfExists(final String instanceIdentifier) {
ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier);
if (instanceIdentifier == null) {
return null;
}
final ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier);
if (classLoader != null && (classLoader instanceof URLClassLoader)) {
final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
try {

View File

@ -35,6 +35,7 @@ public class InstanceClassLoader extends URLClassLoader {
private static final Logger logger = LoggerFactory.getLogger(InstanceClassLoader.class);
private final String identifier;
private final String instanceType;
private ShimClassLoader shimClassLoader;
/**
@ -42,9 +43,10 @@ public class InstanceClassLoader extends URLClassLoader {
* @param urls the URLs for the ClassLoader
* @param parent the parent ClassLoader
*/
public InstanceClassLoader(final String identifier, final URL[] urls, final ClassLoader parent) {
public InstanceClassLoader(final String identifier, final String type, final URL[] urls, final ClassLoader parent) {
super(urls, parent);
this.identifier = identifier;
this.instanceType = type;
}
/**
@ -58,12 +60,11 @@ public class InstanceClassLoader extends URLClassLoader {
try {
shimClassLoader.close();
} catch (IOException e) {
logger.warn("Unable to close URLClassLoader for " + identifier);
logger.warn("Unable to close inner URLClassLoader for " + identifier);
}
}
// don't set a parent here b/c otherwise it will create an infinite loop
shimClassLoader = new ShimClassLoader(urls, null);
shimClassLoader = new ShimClassLoader(urls, getParent());
}
/**
@ -88,7 +89,7 @@ public class InstanceClassLoader extends URLClassLoader {
if (shimClassLoader != null) {
try {
c = shimClassLoader.loadClass(name, resolve);
} catch (ClassNotFoundException cnf) {
} catch (ClassNotFoundException e) {
c = null;
}
}
@ -119,6 +120,18 @@ public class InstanceClassLoader extends URLClassLoader {
}
}
@Override
public void close() throws IOException {
if (shimClassLoader != null) {
try {
shimClassLoader.close();
} catch (IOException e) {
logger.warn("Unable to close inner URLClassLoader for " + identifier);
}
}
super.close();
}
/**
* Extend URLClassLoader to increase visibility of protected methods so that InstanceClassLoader can delegate.
*/

View File

@ -47,6 +47,7 @@ import javax.net.SocketFactory;
import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
@ -55,6 +56,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -121,6 +124,15 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
.name("Additional Classpath Resources")
.description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " +
"directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamicallyModifiesClasspath(true)
.build();
private static final Object RESOURCES_LOCK = new Object();
private long kerberosReloginThreshold;
@ -148,6 +160,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
props.add(KERBEROS_RELOGIN_PERIOD);
props.add(ADDITIONAL_CLASSPATH_RESOURCES);
properties = Collections.unmodifiableList(props);
}
@ -227,7 +240,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
private static Configuration getConfigurationFromResources(String configResources) throws IOException {
boolean foundResources = false;
final Configuration config = new Configuration();
final Configuration config = new ExtendedConfiguration();
if (null != configResources) {
String[] resources = configResources.split(",");
for (String resource : resources) {
@ -257,51 +270,41 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/
HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException {
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
// NarThreadContextClassLoader.
ClassLoader savedClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
Configuration config = getConfigurationFromResources(configResources);
config.setClassLoader(Thread.currentThread().getContextClassLoader()); // set the InstanceClassLoader
try {
Configuration config = getConfigurationFromResources(configResources);
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
// restart
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
// restart
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs;
UserGroupInformation ugi;
synchronized (RESOURCES_LOCK) {
if (SecurityUtil.isSecurityEnabled(config)) {
String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
fs = getFileSystemAsUser(config, ugi);
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
ugi = SecurityUtil.loginSimple(config);
fs = getFileSystemAsUser(config, ugi);
}
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs;
UserGroupInformation ugi;
synchronized (RESOURCES_LOCK) {
if (SecurityUtil.isSecurityEnabled(config)) {
String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
fs = getFileSystemAsUser(config, ugi);
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
ugi = SecurityUtil.loginSimple(config);
fs = getFileSystemAsUser(config, ugi);
}
final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
return new HdfsResources(config, fs, ugi);
} finally {
Thread.currentThread().setContextClassLoader(savedClassLoader);
}
final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
return new HdfsResources(config, fs, ugi);
}
/**
@ -510,4 +513,50 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
}
/**
* Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
* adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
* something that was previously not found, but might now be available.
*
* Reference the original getClassByNameOrNull from Configuration.
*/
static class ExtendedConfiguration extends Configuration {
private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>();
public Class<?> getClassByNameOrNull(String name) {
Map<String, WeakReference<Class<?>>> map;
synchronized (CACHE_CLASSES) {
map = CACHE_CLASSES.get(getClassLoader());
if (map == null) {
map = Collections.synchronizedMap(new WeakHashMap<>());
CACHE_CLASSES.put(getClassLoader(), map);
}
}
Class<?> clazz = null;
WeakReference<Class<?>> ref = map.get(name);
if (ref != null) {
clazz = ref.get();
}
if (clazz == null) {
try {
clazz = Class.forName(name, true, getClassLoader());
} catch (ClassNotFoundException e) {
e.printStackTrace();
return null;
}
// two putters can race here, but they'll put the same class
map.put(name, new WeakReference<>(clazz));
return clazz;
} else {
// cache hit
return clazz;
}
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -54,6 +55,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -205,177 +207,186 @@ public class PutHDFS extends AbstractHadoopProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Configuration configuration = getConfiguration();
final FileSystem hdfs = getFileSystem();
if (configuration == null || hdfs == null) {
final Configuration configuration = getConfiguration();
final UserGroupInformation ugi = getUserGroupInformation();
if (configuration == null || hdfs == null || ugi == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
Path tempDotCopyFile = null;
try {
final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final Path configuredRootDirPath = new Path(dirValue);
ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
final Path configuredRootDirPath = new Path(dirValue);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
.getDefaultReplication(configuredRootDirPath);
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
.getDefaultReplication(configuredRootDirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
final CompressionCodec codec = getCompressionCodec(context, configuration);
final String filename = codec != null
? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String filename = codec != null
? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
final Path copyFile = new Path(configuredRootDirPath, filename);
final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
final Path copyFile = new Path(configuredRootDirPath, filename);
// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(configuredRootDirPath)) {
throw new IOException(configuredRootDirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, configuredRootDirPath);
}
final boolean destinationExists = hdfs.exists(copyFile);
// If destination file already exists, resolve that based on processor configuration
if (destinationExists) {
switch (conflictResponse) {
case REPLACE_RESOLUTION:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{copyFile, flowFile});
}
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("transferring {} to success because file with same name already exists",
new Object[]{flowFile});
return;
case FAIL_RESOLUTION:
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
new Object[]{flowFile});
return;
default:
break;
}
}
// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
OutputStream fos = null;
Path createdFile = null;
// Create destination directory if it does not exist
try {
if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
}
if (codec != null) {
fos = codec.createOutputStream(fos);
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(configuredRootDirPath)) {
throw new IOException(configuredRootDirPath.toString() + " could not be created");
}
createdFile = tempCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
} finally {
try {
if (fos != null) {
fos.close();
}
} catch (RemoteException re) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
changeOwner(context, hdfs, configuredRootDirPath);
}
final boolean destinationExists = hdfs.exists(copyFile);
// If destination file already exists, resolve that based on processor configuration
if (destinationExists) {
switch (conflictResponse) {
case REPLACE_RESOLUTION:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{copyFile, putFlowFile});
}
break;
case IGNORE_RESOLUTION:
session.transfer(putFlowFile, REL_SUCCESS);
getLogger().info("transferring {} to success because file with same name already exists",
new Object[]{putFlowFile});
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
new Object[]{putFlowFile});
return null;
default:
break;
}
}
// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
session.read(putFlowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
OutputStream fos = null;
Path createdFile = null;
try {
if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
}
if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = tempCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
} finally {
try {
hdfs.delete(createdFile, false);
if (fos != null) {
fos.close();
}
} catch (RemoteException re) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
try {
hdfs.delete(createdFile, false);
} catch (Throwable ignore) {
}
}
throw re;
} catch (Throwable ignore) {
}
fos = null;
}
throw re;
} catch (Throwable ignore) {
}
fos = null;
});
stopWatch.stop();
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue())
|| (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) {
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) {
renamed = true;
break;// rename was successful
}
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
}
if (!renamed) {
hdfs.delete(tempCopyFile, false);
throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+ " to its final filename");
}
changeOwner(context, hdfs, copyFile);
}
}
});
stopWatch.stop();
final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{putFlowFile, copyFile, millis, dataRate});
if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue())
|| (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) {
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) {
renamed = true;
break;// rename was successful
final String outputPath = copyFile.toString();
final String newFilename = copyFile.getName();
final String hdfsPath = copyFile.getParent().toString();
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(putFlowFile, transitUri);
session.transfer(putFlowFile, REL_SUCCESS);
} catch (final Throwable t) {
if (tempDotCopyFile != null) {
try {
hdfs.delete(tempDotCopyFile, false);
} catch (Exception e) {
getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
}
}
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
}
if (!renamed) {
hdfs.delete(tempCopyFile, false);
throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+ " to its final filename");
getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
context.yield();
}
changeOwner(context, hdfs, copyFile);
return null;
}
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate});
final String outputPath = copyFile.toString();
final String newFilename = copyFile.getName();
final String hdfsPath = copyFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Throwable t) {
if (tempDotCopyFile != null) {
try {
hdfs.delete(tempDotCopyFile, false);
} catch (Exception e) {
getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
}
}
getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
}
});
}
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) {

View File

@ -253,6 +253,14 @@ The following binary components are provided under the Apache Software License v
Apache Software Foundation that were originally developed at iClick, Inc.,
software copyright (c) 1999.
(ASLv2) Apache HTrace Core
The following NOTICE information applies:
Copyright 2016 The Apache Software Foundation
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
(ASLv2) Apache Tomcat
The following NOTICE information applies:
Apache Tomcat

View File

@ -269,15 +269,13 @@ Apache Software License v2
Licensed under the Apache License, Version 2.0
(ASLv2) HTrace Core
The following NOTICE information applies:
In addition, this product includes software dependencies. See
the accompanying LICENSE.txt for a listing of dependencies
that are NOT Apache licensed (with pointers to their licensing)
(ASLv2) Apache HTrace Core
The following NOTICE information applies:
Copyright 2016 The Apache Software Foundation
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
(ASLv2) Jackson Core ASL
The following NOTICE information applies:

View File

@ -95,7 +95,7 @@ language governing permissions and limitations under the License. -->
<spring.version>4.2.4.RELEASE</spring.version>
<spring.security.version>4.0.3.RELEASE</spring.security.version>
<jersey.version>1.19</jersey.version>
<hadoop.version>2.6.2</hadoop.version>
<hadoop.version>2.7.3</hadoop.version>
<hadoop.guava.version>12.0.1</hadoop.guava.version>
<hadoop.http.client.version>4.2.5</hadoop.http.client.version>
<yammer.metrics.version>2.2.0</yammer.metrics.version>
@ -646,7 +646,6 @@ language governing permissions and limitations under the License. -->
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
@ -667,6 +666,11 @@ language governing permissions and limitations under the License. -->
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>