mirror of https://github.com/apache/nifi.git
NIFI-9032 Refactoring HDFS processors in order to increase flexibility
This closes #5295. Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
parent
714670b8e6
commit
c3ecf2fea8
|
@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||
import org.apache.nifi.components.resource.ResourceReferences;
|
||||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
|
@ -134,7 +135,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
.dynamicallyModifiesClasspath(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
|
||||
|
@ -187,7 +188,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final ResourceReferences configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
|
||||
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
|
||||
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
|
||||
|
@ -204,36 +204,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
final List<String> locations = getConfigLocations(validationContext);
|
||||
|
||||
if (configResources.getCount() == 0) {
|
||||
if (locations.isEmpty()) {
|
||||
return results;
|
||||
}
|
||||
|
||||
try {
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
||||
// if no resources in the holder, or if the holder has different resources loaded,
|
||||
// then load the Configuration and set the new resources in the holder
|
||||
if (resources == null || !configResources.equals(resources.getConfigResources())) {
|
||||
getLogger().debug("Reloading validation resources");
|
||||
final Configuration config = new ExtendedConfiguration(getLogger());
|
||||
config.setClassLoader(Thread.currentThread().getContextClassLoader());
|
||||
resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources));
|
||||
validationResourceHolder.set(resources);
|
||||
}
|
||||
|
||||
final Configuration conf = resources.getConfiguration();
|
||||
final Configuration conf = getHadoopConfigurationForValidation(locations);
|
||||
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
|
||||
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
|
||||
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
|
||||
|
||||
final URI fileSystemUri = FileSystem.getDefaultUri(conf);
|
||||
if (isFileSystemAccessDenied(fileSystemUri)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject("Hadoop File System")
|
||||
.explanation(DENY_LFS_EXPLANATION)
|
||||
.build());
|
||||
}
|
||||
results.addAll(validateFileSystem(conf));
|
||||
} catch (final IOException e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
|
@ -262,6 +244,36 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
return results;
|
||||
}
|
||||
|
||||
protected Collection<ValidationResult> validateFileSystem(final Configuration configuration) {
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
if (isFileSystemAccessDenied(FileSystem.getDefaultUri(configuration))) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject("Hadoop File System")
|
||||
.explanation(DENY_LFS_EXPLANATION)
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
protected Configuration getHadoopConfigurationForValidation(final List<String> locations) throws IOException {
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
||||
// if no resources in the holder, or if the holder has different resources loaded,
|
||||
// then load the Configuration and set the new resources in the holder
|
||||
if (resources == null || !locations.equals(resources.getConfigLocations())) {
|
||||
getLogger().debug("Reloading validation resources");
|
||||
final Configuration config = new ExtendedConfiguration(getLogger());
|
||||
config.setClassLoader(Thread.currentThread().getContextClassLoader());
|
||||
resources = new ValidationResources(locations, getConfigurationFromResources(config, locations));
|
||||
validationResourceHolder.set(resources);
|
||||
}
|
||||
|
||||
return resources.getConfiguration();
|
||||
}
|
||||
|
||||
/**
|
||||
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
|
||||
*/
|
||||
|
@ -272,17 +284,22 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
|
||||
HdfsResources resources = hdfsResources.get();
|
||||
if (resources.getConfiguration() == null) {
|
||||
final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
|
||||
resources = resetHDFSResources(configResources, context);
|
||||
resources = resetHDFSResources(getConfigLocations(context), context);
|
||||
hdfsResources.set(resources);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
|
||||
getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
|
||||
hdfsResources.set(EMPTY_HDFS_RESOURCES);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
protected List<String> getConfigLocations(PropertyContext context) {
|
||||
final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
|
||||
final List<String> locations = configResources.asLocations();
|
||||
return locations;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public final void abstractOnStopped() {
|
||||
final HdfsResources resources = hdfsResources.get();
|
||||
|
@ -345,10 +362,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private static Configuration getConfigurationFromResources(final Configuration config, final ResourceReferences resourceReferences) throws IOException {
|
||||
boolean foundResources = resourceReferences.getCount() > 0;
|
||||
private static Configuration getConfigurationFromResources(final Configuration config, final List<String> locations) throws IOException {
|
||||
boolean foundResources = !locations.isEmpty();
|
||||
|
||||
if (foundResources) {
|
||||
final List<String> locations = resourceReferences.asLocations();
|
||||
for (String resource : locations) {
|
||||
config.addResource(new Path(resource.trim()));
|
||||
}
|
||||
|
@ -372,11 +389,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
/*
|
||||
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
|
||||
*/
|
||||
HdfsResources resetHDFSResources(final ResourceReferences resourceReferences, ProcessContext context) throws IOException {
|
||||
HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
|
||||
Configuration config = new ExtendedConfiguration(getLogger());
|
||||
config.setClassLoader(Thread.currentThread().getContextClassLoader());
|
||||
|
||||
getConfigurationFromResources(config, resourceReferences);
|
||||
getConfigurationFromResources(config, resourceLocations);
|
||||
|
||||
// give sub-classes a chance to process configuration
|
||||
preProcessConfiguration(config, context);
|
||||
|
@ -559,7 +576,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
kerberosUser.checkTGTAndRelogin();
|
||||
} catch (LoginException e) {
|
||||
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
|
||||
}
|
||||
|
@ -577,7 +594,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
return Boolean.parseBoolean(System.getenv(DENY_LFS_ACCESS));
|
||||
}
|
||||
|
||||
private boolean isFileSystemAccessDenied(final URI fileSystemUri) {
|
||||
protected boolean isFileSystemAccessDenied(final URI fileSystemUri) {
|
||||
boolean accessDenied;
|
||||
|
||||
if (isLocalFileSystemAccessDenied()) {
|
||||
|
@ -590,16 +607,16 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
static protected class ValidationResources {
|
||||
private final ResourceReferences configResources;
|
||||
private final List<String> configLocations;
|
||||
private final Configuration configuration;
|
||||
|
||||
public ValidationResources(final ResourceReferences configResources, Configuration configuration) {
|
||||
this.configResources = configResources;
|
||||
public ValidationResources(final List<String> configLocations, final Configuration configuration) {
|
||||
this.configLocations = configLocations;
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
public ResourceReferences getConfigResources() {
|
||||
return configResources;
|
||||
public List<String> getConfigLocations() {
|
||||
return configLocations;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
|
@ -611,7 +628,25 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
return getNormalizedPath(context, property, null);
|
||||
}
|
||||
|
||||
protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property, FlowFile flowFile) {
|
||||
protected Path getNormalizedPath(final String rawPath) {
|
||||
final Path path = new Path(rawPath);
|
||||
final URI uri = path.toUri();
|
||||
|
||||
final URI fileSystemUri = getFileSystem().getUri();
|
||||
|
||||
if (uri.getScheme() != null) {
|
||||
if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
|
||||
getLogger().warn("The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
|
||||
"and will be ignored.", uri, fileSystemUri);
|
||||
}
|
||||
|
||||
return new Path(uri.getPath());
|
||||
} else {
|
||||
return path;
|
||||
}
|
||||
}
|
||||
|
||||
protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) {
|
||||
final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final Path path = new Path(propertyValue);
|
||||
final URI uri = path.toUri();
|
||||
|
@ -629,4 +664,4 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
return path;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,343 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
import org.ietf.jgss.GSSException;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
|
||||
protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
|
||||
protected static final int BUFFER_SIZE_DEFAULT = 4096;
|
||||
|
||||
protected static final String REPLACE_RESOLUTION = "replace";
|
||||
protected static final String IGNORE_RESOLUTION = "ignore";
|
||||
protected static final String FAIL_RESOLUTION = "fail";
|
||||
protected static final String APPEND_RESOLUTION = "append";
|
||||
|
||||
protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
|
||||
REPLACE_RESOLUTION, "Replaces the existing file if any.");
|
||||
protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
|
||||
"Ignores the flow file and routes it to success.");
|
||||
protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
|
||||
"Penalizes the flow file and routes it to failure.");
|
||||
protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
|
||||
"Appends to the existing file if any, creates a new file otherwise.");
|
||||
|
||||
protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
|
||||
.name("Conflict Resolution Strategy")
|
||||
.description("Indicates what should happen when a file with the same name already exists in the output directory")
|
||||
.required(true)
|
||||
.defaultValue(FAIL_RESOLUTION_AV.getValue())
|
||||
.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
final FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final FileSystem hdfs = getFileSystem();
|
||||
final Configuration configuration = getConfiguration();
|
||||
final UserGroupInformation ugi = getUserGroupInformation();
|
||||
|
||||
if (configuration == null || hdfs == null || ugi == null) {
|
||||
getLogger().error("HDFS not configured properly");
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
ugi.doAs(new PrivilegedAction<Object>() {
|
||||
@Override
|
||||
public Object run() {
|
||||
Path tempDotCopyFile = null;
|
||||
FlowFile putFlowFile = flowFile;
|
||||
try {
|
||||
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
|
||||
|
||||
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
|
||||
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
|
||||
final int bufferSize = getBufferSize(context, session, putFlowFile);
|
||||
final short replication = getReplication(context, session, putFlowFile, dirPath);
|
||||
|
||||
final CompressionCodec codec = getCompressionCodec(context, configuration);
|
||||
|
||||
final String filename = codec != null
|
||||
? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
|
||||
: putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
|
||||
final Path tempCopyFile = new Path(dirPath, "." + filename);
|
||||
final Path copyFile = new Path(dirPath, filename);
|
||||
|
||||
// Create destination directory if it does not exist
|
||||
try {
|
||||
if (!hdfs.getFileStatus(dirPath).isDirectory()) {
|
||||
throw new IOException(dirPath.toString() + " already exists and is not a directory");
|
||||
}
|
||||
} catch (FileNotFoundException fe) {
|
||||
if (!hdfs.mkdirs(dirPath)) {
|
||||
throw new IOException(dirPath.toString() + " could not be created");
|
||||
}
|
||||
changeOwner(context, hdfs, dirPath, flowFile);
|
||||
}
|
||||
|
||||
final boolean destinationExists = hdfs.exists(copyFile);
|
||||
|
||||
// If destination file already exists, resolve that based on processor configuration
|
||||
if (destinationExists) {
|
||||
switch (conflictResponse) {
|
||||
case REPLACE_RESOLUTION:
|
||||
if (hdfs.delete(copyFile, false)) {
|
||||
getLogger().info("deleted {} in order to replace with the contents of {}",
|
||||
new Object[]{copyFile, putFlowFile});
|
||||
}
|
||||
break;
|
||||
case IGNORE_RESOLUTION:
|
||||
session.transfer(putFlowFile, getSuccessRelationship());
|
||||
getLogger().info("transferring {} to success because file with same name already exists",
|
||||
new Object[]{putFlowFile});
|
||||
return null;
|
||||
case FAIL_RESOLUTION:
|
||||
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
|
||||
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
|
||||
new Object[]{putFlowFile});
|
||||
return null;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Write FlowFile to temp file on HDFS
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
session.read(putFlowFile, new InputStreamCallback() {
|
||||
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
OutputStream fos = null;
|
||||
Path createdFile = null;
|
||||
try {
|
||||
if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
|
||||
fos = hdfs.append(copyFile, bufferSize);
|
||||
} else {
|
||||
final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
|
||||
|
||||
if (shouldIgnoreLocality(context, session)) {
|
||||
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
|
||||
}
|
||||
|
||||
fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
|
||||
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
|
||||
null, null);
|
||||
}
|
||||
|
||||
if (codec != null) {
|
||||
fos = codec.createOutputStream(fos);
|
||||
}
|
||||
createdFile = tempCopyFile;
|
||||
BufferedInputStream bis = new BufferedInputStream(in);
|
||||
StreamUtils.copy(bis, fos);
|
||||
bis = null;
|
||||
fos.flush();
|
||||
} finally {
|
||||
try {
|
||||
if (fos != null) {
|
||||
fos.close();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
|
||||
if (createdFile != null) {
|
||||
try {
|
||||
hdfs.delete(createdFile, false);
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
}
|
||||
throw t;
|
||||
}
|
||||
fos = null;
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
stopWatch.stop();
|
||||
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
|
||||
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
tempDotCopyFile = tempCopyFile;
|
||||
|
||||
if (!conflictResponse.equals(APPEND_RESOLUTION)
|
||||
|| (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
|
||||
boolean renamed = false;
|
||||
for (int i = 0; i < 10; i++) { // try to rename multiple times.
|
||||
if (hdfs.rename(tempCopyFile, copyFile)) {
|
||||
renamed = true;
|
||||
break;// rename was successful
|
||||
}
|
||||
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
|
||||
}
|
||||
if (!renamed) {
|
||||
hdfs.delete(tempCopyFile, false);
|
||||
throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
|
||||
+ " to its final filename");
|
||||
}
|
||||
|
||||
changeOwner(context, hdfs, copyFile, flowFile);
|
||||
}
|
||||
|
||||
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
|
||||
new Object[]{putFlowFile, copyFile, millis, dataRate});
|
||||
|
||||
final String newFilename = copyFile.getName();
|
||||
final String hdfsPath = copyFile.getParent().toString();
|
||||
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
|
||||
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
|
||||
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
|
||||
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
|
||||
|
||||
session.transfer(putFlowFile, getSuccessRelationship());
|
||||
|
||||
} catch (final IOException e) {
|
||||
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
|
||||
if (causeOptional.isPresent()) {
|
||||
getLogger().warn("An error occurred while connecting to HDFS. "
|
||||
+ "Rolling back session, and penalizing flow file {}",
|
||||
new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
|
||||
session.rollback(true);
|
||||
} else {
|
||||
getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
|
||||
session.transfer(putFlowFile, getFailureRelationship());
|
||||
}
|
||||
} catch (final Throwable t) {
|
||||
if (tempDotCopyFile != null) {
|
||||
try {
|
||||
hdfs.delete(tempDotCopyFile, false);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
|
||||
}
|
||||
}
|
||||
getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
|
||||
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
|
||||
context.yield();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns with the expected block size.
|
||||
*/
|
||||
protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
|
||||
|
||||
/**
|
||||
* Returns with the expected buffer size.
|
||||
*/
|
||||
protected abstract int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
|
||||
|
||||
/**
|
||||
* Returns with the expected replication factor.
|
||||
*/
|
||||
protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
|
||||
|
||||
/**
|
||||
* Returns if file system should ignore locality.
|
||||
*/
|
||||
protected abstract boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session);
|
||||
|
||||
/**
|
||||
* If returns a non-null value, the uploaded file's owner will be changed to this value after it is written. This only
|
||||
* works if NiFi is running as a user that has privilege to change owner.
|
||||
*/
|
||||
protected abstract String getOwner(final ProcessContext context, final FlowFile flowFile);
|
||||
|
||||
/**
|
||||
* I returns a non-null value, thee uploaded file's group will be changed to this value after it is written. This only
|
||||
* works if NiFi is running as a user that has privilege to change group.
|
||||
*/
|
||||
protected abstract String getGroup(final ProcessContext context, final FlowFile flowFile);
|
||||
|
||||
/**
|
||||
* @return The relationship the flow file will be transferred in case of successful execution.
|
||||
*/
|
||||
protected abstract Relationship getSuccessRelationship();
|
||||
|
||||
/**
|
||||
* @return The relationship the flow file will be transferred in case of failed execution.
|
||||
*/
|
||||
protected abstract Relationship getFailureRelationship();
|
||||
|
||||
/**
|
||||
* Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
|
||||
* and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
|
||||
* @param t The throwable to inspect for the cause.
|
||||
* @return
|
||||
*/
|
||||
private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
|
||||
Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
|
||||
return causalChain
|
||||
.filter(expectedCauseType::isInstance)
|
||||
.map(expectedCauseType::cast)
|
||||
.filter(causePredicate)
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
|
||||
try {
|
||||
// Change owner and group of file if configured to do so
|
||||
final String owner = getOwner(context, flowFile);
|
||||
final String group = getGroup(context, flowFile);
|
||||
|
||||
if (owner != null || group != null) {
|
||||
hdfs.setOwner(name, owner, group);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -144,9 +144,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
// We need a FlowFile to report provenance correctly.
|
||||
final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
|
||||
|
||||
final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
|
||||
|
||||
final String fileOrDirectoryName = getPath(context, session, finalFlowFile);
|
||||
final FileSystem fileSystem = getFileSystem();
|
||||
final UserGroupInformation ugi = getUserGroupInformation();
|
||||
|
||||
|
@ -171,11 +169,11 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
|
|||
if (fileSystem.exists(path)) {
|
||||
try {
|
||||
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
|
||||
attributes.put("hdfs.filename", path.getName());
|
||||
attributes.put("hdfs.path", path.getParent().toString());
|
||||
attributes.put(getAttributePrefix() + ".filename", path.getName());
|
||||
attributes.put(getAttributePrefix() + ".path", path.getParent().toString());
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
|
||||
fileSystem.delete(path, isRecursive(context, session));
|
||||
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
|
||||
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
|
||||
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
|
||||
|
@ -186,27 +184,47 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
|
||||
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
|
||||
attributes.put("hdfs.error.message", ioe.getMessage());
|
||||
attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());
|
||||
|
||||
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
|
||||
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
|
||||
failedPath++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedPath == 0) {
|
||||
session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
|
||||
session.transfer(flowFile, getSuccessRelationship());
|
||||
} else {
|
||||
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
|
||||
session.remove(flowFile);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
|
||||
session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
protected Relationship getSuccessRelationship() {
|
||||
return REL_SUCCESS;
|
||||
}
|
||||
|
||||
protected Relationship getFailureRelationship() {
|
||||
return REL_FAILURE;
|
||||
}
|
||||
|
||||
protected boolean isRecursive(final ProcessContext context, final ProcessSession session) {
|
||||
return context.getProperty(RECURSIVE).asBoolean();
|
||||
}
|
||||
|
||||
protected String getPath(final ProcessContext context, final ProcessSession session, final FlowFile finalFlowFile) {
|
||||
return getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
|
||||
}
|
||||
|
||||
protected String getAttributePrefix() {
|
||||
return "hdfs";
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -121,16 +121,16 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
final FileSystem hdfs = getFileSystem();
|
||||
final UserGroupInformation ugi = getUserGroupInformation();
|
||||
final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String filenameValue = getPath(context, flowFile);
|
||||
|
||||
final Path path;
|
||||
try {
|
||||
path = getNormalizedPath(context, FILENAME, flowFile);
|
||||
path = getNormalizedPath(getPath(context, flowFile));
|
||||
} catch (IllegalArgumentException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
|
||||
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
|
||||
flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -144,7 +144,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
|||
CompressionCodec codec = null;
|
||||
Configuration conf = getConfiguration();
|
||||
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
|
||||
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
|
||||
final CompressionType compressionType = getCompressionType(context);
|
||||
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
|
||||
|
||||
if(inferCompressionCodec) {
|
||||
|
@ -174,16 +174,16 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
|||
stopWatch.stop();
|
||||
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
|
||||
session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.transfer(flowFile, getSuccessRelationship());
|
||||
} catch (final FileNotFoundException | AccessControlException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
|
||||
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
|
||||
flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
} catch (final IOException e) {
|
||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_COMMS_FAILURE);
|
||||
session.transfer(flowFile, getCommsFailureRelationship());
|
||||
} finally {
|
||||
IOUtils.closeQuietly(stream);
|
||||
}
|
||||
|
@ -191,7 +191,29 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
|||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
protected Relationship getSuccessRelationship() {
|
||||
return REL_SUCCESS;
|
||||
}
|
||||
|
||||
protected Relationship getFailureRelationship() {
|
||||
return REL_FAILURE;
|
||||
}
|
||||
|
||||
protected Relationship getCommsFailureRelationship() {
|
||||
return REL_COMMS_FAILURE;
|
||||
}
|
||||
|
||||
protected String getPath(final ProcessContext context, final FlowFile flowFile) {
|
||||
return context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
}
|
||||
|
||||
protected String getAttributePrefix() {
|
||||
return "hdfs";
|
||||
}
|
||||
|
||||
protected CompressionType getCompressionType(final ProcessContext context) {
|
||||
return CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -503,7 +503,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
final Map<String, String> attributes = createAttributes(status);
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.transfer(flowFile, getSuccessRelationship());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -528,7 +528,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.transfer(flowFile, getSuccessRelationship());
|
||||
}
|
||||
|
||||
private Record createRecord(final FileStatus fileStatus) {
|
||||
|
@ -620,15 +620,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
|
||||
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
|
||||
|
||||
attributes.put("hdfs.owner", status.getOwner());
|
||||
attributes.put("hdfs.group", status.getGroup());
|
||||
attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
|
||||
attributes.put("hdfs.length", String.valueOf(status.getLen()));
|
||||
attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
|
||||
attributes.put(getAttributePrefix() + ".owner", status.getOwner());
|
||||
attributes.put(getAttributePrefix() + ".group", status.getGroup());
|
||||
attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime()));
|
||||
attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen()));
|
||||
attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication()));
|
||||
|
||||
final FsPermission permission = status.getPermission();
|
||||
final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
|
||||
attributes.put("hdfs.permissions", perms);
|
||||
attributes.put(getAttributePrefix() + ".permissions", perms);
|
||||
return attributes;
|
||||
}
|
||||
|
||||
|
@ -669,4 +669,11 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
};
|
||||
}
|
||||
|
||||
protected Relationship getSuccessRelationship() {
|
||||
return REL_SUCCESS;
|
||||
}
|
||||
|
||||
protected String getAttributePrefix() {
|
||||
return "hdfs";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,9 +16,15 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
|
@ -29,22 +35,40 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
import org.ietf.jgss.GSSException;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* This processor copies FlowFiles to HDFS.
|
||||
|
@ -63,7 +87,11 @@ import java.util.Set;
|
|||
requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM,
|
||||
explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
|
||||
})
|
||||
public class PutHDFS extends AbstractPutHDFS {
|
||||
public class PutHDFS extends AbstractHadoopProcessor {
|
||||
|
||||
protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
|
||||
protected static final int BUFFER_SIZE_DEFAULT = 4096;
|
||||
|
||||
// relationships
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
|
@ -78,6 +106,28 @@ public class PutHDFS extends AbstractPutHDFS {
|
|||
|
||||
// properties
|
||||
|
||||
protected static final String REPLACE_RESOLUTION = "replace";
|
||||
protected static final String IGNORE_RESOLUTION = "ignore";
|
||||
protected static final String FAIL_RESOLUTION = "fail";
|
||||
protected static final String APPEND_RESOLUTION = "append";
|
||||
|
||||
protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
|
||||
REPLACE_RESOLUTION, "Replaces the existing file if any.");
|
||||
protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
|
||||
"Ignores the flow file and routes it to success.");
|
||||
protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
|
||||
"Penalizes the flow file and routes it to failure.");
|
||||
protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
|
||||
"Appends to the existing file if any, creates a new file otherwise.");
|
||||
|
||||
protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
|
||||
.name("Conflict Resolution Strategy")
|
||||
.description("Indicates what should happen when a file with the same name already exists in the output directory")
|
||||
.required(true)
|
||||
.defaultValue(FAIL_RESOLUTION_AV.getValue())
|
||||
.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Block Size")
|
||||
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
|
||||
|
@ -179,48 +229,263 @@ public class PutHDFS extends AbstractPutHDFS {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
final FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final FileSystem hdfs = getFileSystem();
|
||||
final Configuration configuration = getConfiguration();
|
||||
final UserGroupInformation ugi = getUserGroupInformation();
|
||||
|
||||
if (configuration == null || hdfs == null || ugi == null) {
|
||||
getLogger().error("HDFS not configured properly");
|
||||
session.transfer(flowFile, getFailureRelationship());
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
ugi.doAs(new PrivilegedAction<Object>() {
|
||||
@Override
|
||||
public Object run() {
|
||||
Path tempDotCopyFile = null;
|
||||
FlowFile putFlowFile = flowFile;
|
||||
try {
|
||||
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
|
||||
|
||||
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
|
||||
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
|
||||
final int bufferSize = getBufferSize(context, session, putFlowFile);
|
||||
final short replication = getReplication(context, session, putFlowFile, dirPath);
|
||||
|
||||
final CompressionCodec codec = getCompressionCodec(context, configuration);
|
||||
|
||||
final String filename = codec != null
|
||||
? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
|
||||
: putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
|
||||
|
||||
final Path tempCopyFile = new Path(dirPath, "." + filename);
|
||||
final Path copyFile = new Path(dirPath, filename);
|
||||
|
||||
// Create destination directory if it does not exist
|
||||
try {
|
||||
if (!hdfs.getFileStatus(dirPath).isDirectory()) {
|
||||
throw new IOException(dirPath.toString() + " already exists and is not a directory");
|
||||
}
|
||||
} catch (FileNotFoundException fe) {
|
||||
if (!hdfs.mkdirs(dirPath)) {
|
||||
throw new IOException(dirPath.toString() + " could not be created");
|
||||
}
|
||||
changeOwner(context, hdfs, dirPath, flowFile);
|
||||
}
|
||||
|
||||
final boolean destinationExists = hdfs.exists(copyFile);
|
||||
|
||||
// If destination file already exists, resolve that based on processor configuration
|
||||
if (destinationExists) {
|
||||
switch (conflictResponse) {
|
||||
case REPLACE_RESOLUTION:
|
||||
if (hdfs.delete(copyFile, false)) {
|
||||
getLogger().info("deleted {} in order to replace with the contents of {}",
|
||||
new Object[]{copyFile, putFlowFile});
|
||||
}
|
||||
break;
|
||||
case IGNORE_RESOLUTION:
|
||||
session.transfer(putFlowFile, getSuccessRelationship());
|
||||
getLogger().info("transferring {} to success because file with same name already exists",
|
||||
new Object[]{putFlowFile});
|
||||
return null;
|
||||
case FAIL_RESOLUTION:
|
||||
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
|
||||
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
|
||||
new Object[]{putFlowFile});
|
||||
return null;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Write FlowFile to temp file on HDFS
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
session.read(putFlowFile, new InputStreamCallback() {
|
||||
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
OutputStream fos = null;
|
||||
Path createdFile = null;
|
||||
try {
|
||||
if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
|
||||
fos = hdfs.append(copyFile, bufferSize);
|
||||
} else {
|
||||
final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
|
||||
|
||||
if (shouldIgnoreLocality(context, session)) {
|
||||
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
|
||||
}
|
||||
|
||||
fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
|
||||
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
|
||||
null, null);
|
||||
}
|
||||
|
||||
if (codec != null) {
|
||||
fos = codec.createOutputStream(fos);
|
||||
}
|
||||
createdFile = tempCopyFile;
|
||||
BufferedInputStream bis = new BufferedInputStream(in);
|
||||
StreamUtils.copy(bis, fos);
|
||||
bis = null;
|
||||
fos.flush();
|
||||
} finally {
|
||||
try {
|
||||
if (fos != null) {
|
||||
fos.close();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
|
||||
if (createdFile != null) {
|
||||
try {
|
||||
hdfs.delete(createdFile, false);
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
}
|
||||
throw t;
|
||||
}
|
||||
fos = null;
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
stopWatch.stop();
|
||||
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
|
||||
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
|
||||
tempDotCopyFile = tempCopyFile;
|
||||
|
||||
if (!conflictResponse.equals(APPEND_RESOLUTION)
|
||||
|| (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
|
||||
boolean renamed = false;
|
||||
for (int i = 0; i < 10; i++) { // try to rename multiple times.
|
||||
if (hdfs.rename(tempCopyFile, copyFile)) {
|
||||
renamed = true;
|
||||
break;// rename was successful
|
||||
}
|
||||
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
|
||||
}
|
||||
if (!renamed) {
|
||||
hdfs.delete(tempCopyFile, false);
|
||||
throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
|
||||
+ " to its final filename");
|
||||
}
|
||||
|
||||
changeOwner(context, hdfs, copyFile, flowFile);
|
||||
}
|
||||
|
||||
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
|
||||
new Object[]{putFlowFile, copyFile, millis, dataRate});
|
||||
|
||||
final String newFilename = copyFile.getName();
|
||||
final String hdfsPath = copyFile.getParent().toString();
|
||||
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
|
||||
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
|
||||
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
|
||||
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
|
||||
|
||||
session.transfer(putFlowFile, getSuccessRelationship());
|
||||
|
||||
} catch (final IOException e) {
|
||||
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
|
||||
if (causeOptional.isPresent()) {
|
||||
getLogger().warn("An error occurred while connecting to HDFS. "
|
||||
+ "Rolling back session, and penalizing flow file {}",
|
||||
new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
|
||||
session.rollback(true);
|
||||
} else {
|
||||
getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
|
||||
session.transfer(putFlowFile, getFailureRelationship());
|
||||
}
|
||||
} catch (final Throwable t) {
|
||||
if (tempDotCopyFile != null) {
|
||||
try {
|
||||
hdfs.delete(tempDotCopyFile, false);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
|
||||
}
|
||||
}
|
||||
getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
|
||||
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
|
||||
context.yield();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected Relationship getSuccessRelationship() {
|
||||
return REL_SUCCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Relationship getFailureRelationship() {
|
||||
return REL_FAILURE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
|
||||
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
|
||||
return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
|
||||
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
|
||||
return bufferSizeProp != null ? bufferSizeProp.intValue() : getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
|
||||
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
|
||||
return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
|
||||
.getDefaultReplication(dirPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) {
|
||||
return context.getProperty(IGNORE_LOCALITY).asBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getOwner(final ProcessContext context, final FlowFile flowFile) {
|
||||
final String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
return owner == null || owner.isEmpty() ? null : owner;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getGroup(final ProcessContext context, final FlowFile flowFile) {
|
||||
final String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
|
||||
return group == null || group.isEmpty() ? null : group;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
|
||||
* and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
|
||||
* @param t The throwable to inspect for the cause.
|
||||
* @return
|
||||
*/
|
||||
private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
|
||||
Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
|
||||
return causalChain
|
||||
.filter(expectedCauseType::isInstance)
|
||||
.map(expectedCauseType::cast)
|
||||
.filter(causePredicate)
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
|
||||
try {
|
||||
// Change owner and group of file if configured to do so
|
||||
final String owner = getOwner(context, flowFile);
|
||||
final String group = getGroup(context, flowFile);
|
||||
|
||||
if (owner != null || group != null) {
|
||||
hdfs.setOwner(name, owner, group);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import java.net.URISyntaxException;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -124,7 +125,8 @@ public class AbstractHadoopTest {
|
|||
final File brokenCoreSite = new File("src/test/resources/core-site-broken.xml");
|
||||
final ResourceReference brokenCoreSiteReference = new FileResourceReference(brokenCoreSite);
|
||||
final ResourceReferences references = new StandardResourceReferences(Collections.singletonList(brokenCoreSiteReference));
|
||||
processor.resetHDFSResources(references, runner.getProcessContext());
|
||||
final List<String> locations = references.asLocations();
|
||||
processor.resetHDFSResources(locations, runner.getProcessContext());
|
||||
Assert.fail("Should have thrown SocketTimeoutException");
|
||||
} catch (IOException e) {
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.components.resource.ResourceReferences;
|
||||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
|
@ -35,6 +34,7 @@ import org.mockito.ArgumentCaptor;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -96,7 +96,7 @@ public class GetHDFSSequenceFileTest {
|
|||
|
||||
public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
|
||||
@Override
|
||||
HdfsResources resetHDFSResources(ResourceReferences configResources, ProcessContext context) throws IOException {
|
||||
HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
|
||||
return hdfsResources;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue