From c3ecf2fea841c8e6971290bd5ea6b6c0d92f4632 Mon Sep 17 00:00:00 2001 From: Bence Simon Date: Mon, 16 Aug 2021 21:42:53 +0200 Subject: [PATCH] NIFI-9032 Refactoring HDFS processors in order to increase flexibility This closes #5295. Signed-off-by: Tamas Palfy --- .../hadoop/AbstractHadoopProcessor.java | 119 +++--- .../processors/hadoop/AbstractPutHDFS.java | 343 ------------------ .../nifi/processors/hadoop/DeleteHDFS.java | 38 +- .../nifi/processors/hadoop/FetchHDFS.java | 44 ++- .../nifi/processors/hadoop/ListHDFS.java | 23 +- .../nifi/processors/hadoop/PutHDFS.java | 281 +++++++++++++- .../processors/hadoop/AbstractHadoopTest.java | 4 +- .../hadoop/GetHDFSSequenceFileTest.java | 4 +- 8 files changed, 431 insertions(+), 425 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 2838a3883e..57c57cf60a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceReferences; import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.KerberosProperties; @@ -134,7 +135,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .dynamicallyModifiesClasspath(true) .build(); - static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() .name("kerberos-credentials-service") .displayName("Kerberos Credentials Service") .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") @@ -187,7 +188,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { @Override protected Collection customValidate(ValidationContext validationContext) { - final ResourceReferences configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources(); final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue(); @@ -204,36 +204,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } final List results = new ArrayList<>(); + final List locations = getConfigLocations(validationContext); - if (configResources.getCount() == 0) { + if (locations.isEmpty()) { return results; } try { - ValidationResources resources = validationResourceHolder.get(); - - // if no resources in the holder, or if the holder has different resources loaded, - // then load the Configuration and set the new resources in the holder - if (resources == null || !configResources.equals(resources.getConfigResources())) { - getLogger().debug("Reloading validation resources"); - final Configuration config = new ExtendedConfiguration(getLogger()); - config.setClassLoader(Thread.currentThread().getContextClassLoader()); - resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources)); - validationResourceHolder.set(resources); - } - - final Configuration conf = resources.getConfiguration(); + final Configuration conf = getHadoopConfigurationForValidation(locations); results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword( - this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger())); + this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger())); - final URI fileSystemUri = FileSystem.getDefaultUri(conf); - if (isFileSystemAccessDenied(fileSystemUri)) { - results.add(new ValidationResult.Builder() - .valid(false) - .subject("Hadoop File System") - .explanation(DENY_LFS_EXPLANATION) - .build()); - } + results.addAll(validateFileSystem(conf)); } catch (final IOException e) { results.add(new ValidationResult.Builder() .valid(false) @@ -262,6 +244,36 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { return results; } + protected Collection validateFileSystem(final Configuration configuration) { + final List results = new ArrayList<>(); + + if (isFileSystemAccessDenied(FileSystem.getDefaultUri(configuration))) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject("Hadoop File System") + .explanation(DENY_LFS_EXPLANATION) + .build()); + } + + return results; + } + + protected Configuration getHadoopConfigurationForValidation(final List locations) throws IOException { + ValidationResources resources = validationResourceHolder.get(); + + // if no resources in the holder, or if the holder has different resources loaded, + // then load the Configuration and set the new resources in the holder + if (resources == null || !locations.equals(resources.getConfigLocations())) { + getLogger().debug("Reloading validation resources"); + final Configuration config = new ExtendedConfiguration(getLogger()); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); + resources = new ValidationResources(locations, getConfigurationFromResources(config, locations)); + validationResourceHolder.set(resources); + } + + return resources.getConfiguration(); + } + /** * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) */ @@ -272,17 +284,22 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos HdfsResources resources = hdfsResources.get(); if (resources.getConfiguration() == null) { - final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources(); - resources = resetHDFSResources(configResources, context); + resources = resetHDFSResources(getConfigLocations(context), context); hdfsResources.set(resources); } } catch (Exception ex) { - getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); + getLogger().error("HDFS Configuration error - {}", new Object[]{ex}); hdfsResources.set(EMPTY_HDFS_RESOURCES); throw ex; } } + protected List getConfigLocations(PropertyContext context) { + final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources(); + final List locations = configResources.asLocations(); + return locations; + } + @OnStopped public final void abstractOnStopped() { final HdfsResources resources = hdfsResources.get(); @@ -345,10 +362,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } - private static Configuration getConfigurationFromResources(final Configuration config, final ResourceReferences resourceReferences) throws IOException { - boolean foundResources = resourceReferences.getCount() > 0; + private static Configuration getConfigurationFromResources(final Configuration config, final List locations) throws IOException { + boolean foundResources = !locations.isEmpty(); + if (foundResources) { - final List locations = resourceReferences.asLocations(); for (String resource : locations) { config.addResource(new Path(resource.trim())); } @@ -372,11 +389,11 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { /* * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ - HdfsResources resetHDFSResources(final ResourceReferences resourceReferences, ProcessContext context) throws IOException { + HdfsResources resetHDFSResources(final List resourceLocations, ProcessContext context) throws IOException { Configuration config = new ExtendedConfiguration(getLogger()); config.setClassLoader(Thread.currentThread().getContextClassLoader()); - getConfigurationFromResources(config, resourceReferences); + getConfigurationFromResources(config, resourceLocations); // give sub-classes a chance to process configuration preProcessConfiguration(config, context); @@ -559,7 +576,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { kerberosUser.checkTGTAndRelogin(); } catch (LoginException e) { throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); - } + } } else { getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); } @@ -577,7 +594,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { return Boolean.parseBoolean(System.getenv(DENY_LFS_ACCESS)); } - private boolean isFileSystemAccessDenied(final URI fileSystemUri) { + protected boolean isFileSystemAccessDenied(final URI fileSystemUri) { boolean accessDenied; if (isLocalFileSystemAccessDenied()) { @@ -590,16 +607,16 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } static protected class ValidationResources { - private final ResourceReferences configResources; + private final List configLocations; private final Configuration configuration; - public ValidationResources(final ResourceReferences configResources, Configuration configuration) { - this.configResources = configResources; + public ValidationResources(final List configLocations, final Configuration configuration) { + this.configLocations = configLocations; this.configuration = configuration; } - public ResourceReferences getConfigResources() { - return configResources; + public List getConfigLocations() { + return configLocations; } public Configuration getConfiguration() { @@ -611,7 +628,25 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { return getNormalizedPath(context, property, null); } - protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property, FlowFile flowFile) { + protected Path getNormalizedPath(final String rawPath) { + final Path path = new Path(rawPath); + final URI uri = path.toUri(); + + final URI fileSystemUri = getFileSystem().getUri(); + + if (uri.getScheme() != null) { + if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) { + getLogger().warn("The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " + + "and will be ignored.", uri, fileSystemUri); + } + + return new Path(uri.getPath()); + } else { + return path; + } + } + + protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) { final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); final Path path = new Path(propertyValue); final URI uri = path.toUri(); @@ -629,4 +664,4 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { return path; } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java deleted file mode 100644 index b86b7dcb1a..0000000000 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop; - -import com.google.common.base.Throwables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsCreateModes; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.StopWatch; -import org.ietf.jgss.GSSException; - -import java.io.BufferedInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.security.PrivilegedAction; -import java.util.EnumSet; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.stream.Stream; - -public abstract class AbstractPutHDFS extends AbstractHadoopProcessor { - protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; - protected static final int BUFFER_SIZE_DEFAULT = 4096; - - protected static final String REPLACE_RESOLUTION = "replace"; - protected static final String IGNORE_RESOLUTION = "ignore"; - protected static final String FAIL_RESOLUTION = "fail"; - protected static final String APPEND_RESOLUTION = "append"; - - protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, - REPLACE_RESOLUTION, "Replaces the existing file if any."); - protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION, - "Ignores the flow file and routes it to success."); - protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION, - "Penalizes the flow file and routes it to failure."); - protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, - "Appends to the existing file if any, creates a new file otherwise."); - - protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() - .name("Conflict Resolution Strategy") - .description("Indicates what should happen when a file with the same name already exists in the output directory") - .required(true) - .defaultValue(FAIL_RESOLUTION_AV.getValue()) - .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV) - .build(); - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final FileSystem hdfs = getFileSystem(); - final Configuration configuration = getConfiguration(); - final UserGroupInformation ugi = getUserGroupInformation(); - - if (configuration == null || hdfs == null || ugi == null) { - getLogger().error("HDFS not configured properly"); - session.transfer(flowFile, getFailureRelationship()); - context.yield(); - return; - } - - ugi.doAs(new PrivilegedAction() { - @Override - public Object run() { - Path tempDotCopyFile = null; - FlowFile putFlowFile = flowFile; - try { - final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile); - - final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); - final long blockSize = getBlockSize(context, session, putFlowFile, dirPath); - final int bufferSize = getBufferSize(context, session, putFlowFile); - final short replication = getReplication(context, session, putFlowFile, dirPath); - - final CompressionCodec codec = getCompressionCodec(context, configuration); - - final String filename = codec != null - ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() - : putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); - - final Path tempCopyFile = new Path(dirPath, "." + filename); - final Path copyFile = new Path(dirPath, filename); - - // Create destination directory if it does not exist - try { - if (!hdfs.getFileStatus(dirPath).isDirectory()) { - throw new IOException(dirPath.toString() + " already exists and is not a directory"); - } - } catch (FileNotFoundException fe) { - if (!hdfs.mkdirs(dirPath)) { - throw new IOException(dirPath.toString() + " could not be created"); - } - changeOwner(context, hdfs, dirPath, flowFile); - } - - final boolean destinationExists = hdfs.exists(copyFile); - - // If destination file already exists, resolve that based on processor configuration - if (destinationExists) { - switch (conflictResponse) { - case REPLACE_RESOLUTION: - if (hdfs.delete(copyFile, false)) { - getLogger().info("deleted {} in order to replace with the contents of {}", - new Object[]{copyFile, putFlowFile}); - } - break; - case IGNORE_RESOLUTION: - session.transfer(putFlowFile, getSuccessRelationship()); - getLogger().info("transferring {} to success because file with same name already exists", - new Object[]{putFlowFile}); - return null; - case FAIL_RESOLUTION: - session.transfer(session.penalize(putFlowFile), getFailureRelationship()); - getLogger().warn("penalizing {} and routing to failure because file with same name already exists", - new Object[]{putFlowFile}); - return null; - default: - break; - } - } - - // Write FlowFile to temp file on HDFS - final StopWatch stopWatch = new StopWatch(true); - session.read(putFlowFile, new InputStreamCallback() { - - @Override - public void process(InputStream in) throws IOException { - OutputStream fos = null; - Path createdFile = null; - try { - if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { - fos = hdfs.append(copyFile, bufferSize); - } else { - final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - - if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); - } - - fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, - null, null); - } - - if (codec != null) { - fos = codec.createOutputStream(fos); - } - createdFile = tempCopyFile; - BufferedInputStream bis = new BufferedInputStream(in); - StreamUtils.copy(bis, fos); - bis = null; - fos.flush(); - } finally { - try { - if (fos != null) { - fos.close(); - } - } catch (Throwable t) { - // when talking to remote HDFS clusters, we don't notice problems until fos.close() - if (createdFile != null) { - try { - hdfs.delete(createdFile, false); - } catch (Throwable ignore) { - } - } - throw t; - } - fos = null; - } - } - - }); - stopWatch.stop(); - final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize()); - final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - tempDotCopyFile = tempCopyFile; - - if (!conflictResponse.equals(APPEND_RESOLUTION) - || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) { - boolean renamed = false; - for (int i = 0; i < 10; i++) { // try to rename multiple times. - if (hdfs.rename(tempCopyFile, copyFile)) { - renamed = true; - break;// rename was successful - } - Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve - } - if (!renamed) { - hdfs.delete(tempCopyFile, false); - throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile - + " to its final filename"); - } - - changeOwner(context, hdfs, copyFile, flowFile); - } - - getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", - new Object[]{putFlowFile, copyFile, millis, dataRate}); - - final String newFilename = copyFile.getName(); - final String hdfsPath = copyFile.getParent().toString(); - putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename); - putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); - final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); - session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString()); - - session.transfer(putFlowFile, getSuccessRelationship()); - - } catch (final IOException e) { - Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); - if (causeOptional.isPresent()) { - getLogger().warn("An error occurred while connecting to HDFS. " - + "Rolling back session, and penalizing flow file {}", - new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()}); - session.rollback(true); - } else { - getLogger().error("Failed to access HDFS due to {}", new Object[]{e}); - session.transfer(putFlowFile, getFailureRelationship()); - } - } catch (final Throwable t) { - if (tempDotCopyFile != null) { - try { - hdfs.delete(tempDotCopyFile, false); - } catch (Exception e) { - getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); - } - } - getLogger().error("Failed to write to HDFS due to {}", new Object[]{t}); - session.transfer(session.penalize(putFlowFile), getFailureRelationship()); - context.yield(); - } - - return null; - } - }); - } - - /** - * Returns with the expected block size. - */ - protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath); - - /** - * Returns with the expected buffer size. - */ - protected abstract int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile); - - /** - * Returns with the expected replication factor. - */ - protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath); - - /** - * Returns if file system should ignore locality. - */ - protected abstract boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session); - - /** - * If returns a non-null value, the uploaded file's owner will be changed to this value after it is written. This only - * works if NiFi is running as a user that has privilege to change owner. - */ - protected abstract String getOwner(final ProcessContext context, final FlowFile flowFile); - - /** - * I returns a non-null value, thee uploaded file's group will be changed to this value after it is written. This only - * works if NiFi is running as a user that has privilege to change group. - */ - protected abstract String getGroup(final ProcessContext context, final FlowFile flowFile); - - /** - * @return The relationship the flow file will be transferred in case of successful execution. - */ - protected abstract Relationship getSuccessRelationship(); - - /** - * @return The relationship the flow file will be transferred in case of failed execution. - */ - protected abstract Relationship getFailureRelationship(); - - /** - * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type, - * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise. - * @param t The throwable to inspect for the cause. - * @return - */ - private Optional findCause(Throwable t, Class expectedCauseType, Predicate causePredicate) { - Stream causalChain = Throwables.getCausalChain(t).stream(); - return causalChain - .filter(expectedCauseType::isInstance) - .map(expectedCauseType::cast) - .filter(causePredicate) - .findFirst(); - } - - protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { - try { - // Change owner and group of file if configured to do so - final String owner = getOwner(context, flowFile); - final String group = getGroup(context, flowFile); - - if (owner != null || group != null) { - hdfs.setOwner(name, owner, group); - } - } catch (Exception e) { - getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e}); - } - } -} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 9296507437..2c9285fab5 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -144,9 +144,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor { // We need a FlowFile to report provenance correctly. final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create(); - - final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString(); - + final String fileOrDirectoryName = getPath(context, session, finalFlowFile); final FileSystem fileSystem = getFileSystem(); final UserGroupInformation ugi = getUserGroupInformation(); @@ -171,11 +169,11 @@ public class DeleteHDFS extends AbstractHadoopProcessor { if (fileSystem.exists(path)) { try { Map attributes = Maps.newHashMapWithExpectedSize(2); - attributes.put("hdfs.filename", path.getName()); - attributes.put("hdfs.path", path.getParent().toString()); + attributes.put(getAttributePrefix() + ".filename", path.getName()); + attributes.put(getAttributePrefix() + ".path", path.getParent().toString()); flowFile = session.putAllAttributes(flowFile, attributes); - fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); + fileSystem.delete(path, isRecursive(context, session)); getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); @@ -186,27 +184,47 @@ public class DeleteHDFS extends AbstractHadoopProcessor { Map attributes = Maps.newHashMapWithExpectedSize(1); // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) - attributes.put("hdfs.error.message", ioe.getMessage()); + attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); - session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE); + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); failedPath++; } } } if (failedPath == 0) { - session.transfer(flowFile, DeleteHDFS.REL_SUCCESS); + session.transfer(flowFile, getSuccessRelationship()); } else { // If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure. session.remove(flowFile); } } catch (IOException e) { getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e); - session.transfer(flowFile, DeleteHDFS.REL_FAILURE); + session.transfer(flowFile, getFailureRelationship()); } return null; }); } + + protected Relationship getSuccessRelationship() { + return REL_SUCCESS; + } + + protected Relationship getFailureRelationship() { + return REL_FAILURE; + } + + protected boolean isRecursive(final ProcessContext context, final ProcessSession session) { + return context.getProperty(RECURSIVE).asBoolean(); + } + + protected String getPath(final ProcessContext context, final ProcessSession session, final FlowFile finalFlowFile) { + return getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString(); + } + + protected String getAttributePrefix() { + return "hdfs"; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index b60ee5bba3..e4acaac109 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -17,7 +17,7 @@ package org.apache.nifi.processors.hadoop; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -121,16 +121,16 @@ public class FetchHDFS extends AbstractHadoopProcessor { final FileSystem hdfs = getFileSystem(); final UserGroupInformation ugi = getUserGroupInformation(); - final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue(); + final String filenameValue = getPath(context, flowFile); final Path path; try { - path = getNormalizedPath(context, FILENAME, flowFile); + path = getNormalizedPath(getPath(context, flowFile)); } catch (IllegalArgumentException e) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e}); - flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); + session.transfer(flowFile, getFailureRelationship()); return; } @@ -144,7 +144,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { CompressionCodec codec = null; Configuration conf = getConfiguration(); final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); - final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); + final CompressionType compressionType = getCompressionType(context); final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; if(inferCompressionCodec) { @@ -174,16 +174,16 @@ public class FetchHDFS extends AbstractHadoopProcessor { stopWatch.stop(); getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()}); session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); + session.transfer(flowFile, getSuccessRelationship()); } catch (final FileNotFoundException | AccessControlException e) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e}); - flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); + session.transfer(flowFile, getFailureRelationship()); } catch (final IOException e) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e}); flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_COMMS_FAILURE); + session.transfer(flowFile, getCommsFailureRelationship()); } finally { IOUtils.closeQuietly(stream); } @@ -191,7 +191,29 @@ public class FetchHDFS extends AbstractHadoopProcessor { return null; } }); - } + protected Relationship getSuccessRelationship() { + return REL_SUCCESS; + } + + protected Relationship getFailureRelationship() { + return REL_FAILURE; + } + + protected Relationship getCommsFailureRelationship() { + return REL_COMMS_FAILURE; + } + + protected String getPath(final ProcessContext context, final FlowFile flowFile) { + return context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue(); + } + + protected String getAttributePrefix() { + return "hdfs"; + } + + protected CompressionType getCompressionType(final ProcessContext context) { + return CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 583b0b8c65..907a1412f4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -503,7 +503,7 @@ public class ListHDFS extends AbstractHadoopProcessor { final Map attributes = createAttributes(status); FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); + session.transfer(flowFile, getSuccessRelationship()); } } @@ -528,7 +528,7 @@ public class ListHDFS extends AbstractHadoopProcessor { attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); + session.transfer(flowFile, getSuccessRelationship()); } private Record createRecord(final FileStatus fileStatus) { @@ -620,15 +620,15 @@ public class ListHDFS extends AbstractHadoopProcessor { attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); - attributes.put("hdfs.owner", status.getOwner()); - attributes.put("hdfs.group", status.getGroup()); - attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime())); - attributes.put("hdfs.length", String.valueOf(status.getLen())); - attributes.put("hdfs.replication", String.valueOf(status.getReplication())); + attributes.put(getAttributePrefix() + ".owner", status.getOwner()); + attributes.put(getAttributePrefix() + ".group", status.getGroup()); + attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime())); + attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen())); + attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication())); final FsPermission permission = status.getPermission(); final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()); - attributes.put("hdfs.permissions", perms); + attributes.put(getAttributePrefix() + ".permissions", perms); return attributes; } @@ -669,4 +669,11 @@ public class ListHDFS extends AbstractHadoopProcessor { }; } + protected Relationship getSuccessRelationship() { + return REL_SUCCESS; + } + + protected String getAttributePrefix() { + return "hdfs"; + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 00942e3283..29327ec832 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -16,9 +16,15 @@ */ package org.apache.nifi.processors.hadoop; +import com.google.common.base.Throwables; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -29,22 +35,40 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; +import java.io.BufferedInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; /** * This processor copies FlowFiles to HDFS. @@ -63,7 +87,11 @@ import java.util.Set; requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM, explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") }) -public class PutHDFS extends AbstractPutHDFS { +public class PutHDFS extends AbstractHadoopProcessor { + + protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; + protected static final int BUFFER_SIZE_DEFAULT = 4096; + // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -78,6 +106,28 @@ public class PutHDFS extends AbstractPutHDFS { // properties + protected static final String REPLACE_RESOLUTION = "replace"; + protected static final String IGNORE_RESOLUTION = "ignore"; + protected static final String FAIL_RESOLUTION = "fail"; + protected static final String APPEND_RESOLUTION = "append"; + + protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, + REPLACE_RESOLUTION, "Replaces the existing file if any."); + protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION, + "Ignores the flow file and routes it to success."); + protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION, + "Penalizes the flow file and routes it to failure."); + protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, + "Appends to the existing file if any, creates a new file otherwise."); + + protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("Conflict Resolution Strategy") + .description("Indicates what should happen when a file with the same name already exists in the output directory") + .required(true) + .defaultValue(FAIL_RESOLUTION_AV.getValue()) + .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV) + .build(); + public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() .name("Block Size") .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration") @@ -179,48 +229,263 @@ public class PutHDFS extends AbstractPutHDFS { } @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final FileSystem hdfs = getFileSystem(); + final Configuration configuration = getConfiguration(); + final UserGroupInformation ugi = getUserGroupInformation(); + + if (configuration == null || hdfs == null || ugi == null) { + getLogger().error("HDFS not configured properly"); + session.transfer(flowFile, getFailureRelationship()); + context.yield(); + return; + } + + ugi.doAs(new PrivilegedAction() { + @Override + public Object run() { + Path tempDotCopyFile = null; + FlowFile putFlowFile = flowFile; + try { + final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile); + + final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); + final long blockSize = getBlockSize(context, session, putFlowFile, dirPath); + final int bufferSize = getBufferSize(context, session, putFlowFile); + final short replication = getReplication(context, session, putFlowFile, dirPath); + + final CompressionCodec codec = getCompressionCodec(context, configuration); + + final String filename = codec != null + ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() + : putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); + + final Path tempCopyFile = new Path(dirPath, "." + filename); + final Path copyFile = new Path(dirPath, filename); + + // Create destination directory if it does not exist + try { + if (!hdfs.getFileStatus(dirPath).isDirectory()) { + throw new IOException(dirPath.toString() + " already exists and is not a directory"); + } + } catch (FileNotFoundException fe) { + if (!hdfs.mkdirs(dirPath)) { + throw new IOException(dirPath.toString() + " could not be created"); + } + changeOwner(context, hdfs, dirPath, flowFile); + } + + final boolean destinationExists = hdfs.exists(copyFile); + + // If destination file already exists, resolve that based on processor configuration + if (destinationExists) { + switch (conflictResponse) { + case REPLACE_RESOLUTION: + if (hdfs.delete(copyFile, false)) { + getLogger().info("deleted {} in order to replace with the contents of {}", + new Object[]{copyFile, putFlowFile}); + } + break; + case IGNORE_RESOLUTION: + session.transfer(putFlowFile, getSuccessRelationship()); + getLogger().info("transferring {} to success because file with same name already exists", + new Object[]{putFlowFile}); + return null; + case FAIL_RESOLUTION: + session.transfer(session.penalize(putFlowFile), getFailureRelationship()); + getLogger().warn("penalizing {} and routing to failure because file with same name already exists", + new Object[]{putFlowFile}); + return null; + default: + break; + } + } + + // Write FlowFile to temp file on HDFS + final StopWatch stopWatch = new StopWatch(true); + session.read(putFlowFile, new InputStreamCallback() { + + @Override + public void process(InputStream in) throws IOException { + OutputStream fos = null; + Path createdFile = null; + try { + if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { + fos = hdfs.append(copyFile, bufferSize); + } else { + final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); + + if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); + } + + fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, + null, null); + } + + if (codec != null) { + fos = codec.createOutputStream(fos); + } + createdFile = tempCopyFile; + BufferedInputStream bis = new BufferedInputStream(in); + StreamUtils.copy(bis, fos); + bis = null; + fos.flush(); + } finally { + try { + if (fos != null) { + fos.close(); + } + } catch (Throwable t) { + // when talking to remote HDFS clusters, we don't notice problems until fos.close() + if (createdFile != null) { + try { + hdfs.delete(createdFile, false); + } catch (Throwable ignore) { + } + } + throw t; + } + fos = null; + } + } + + }); + stopWatch.stop(); + final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize()); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + tempDotCopyFile = tempCopyFile; + + if (!conflictResponse.equals(APPEND_RESOLUTION) + || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) { + boolean renamed = false; + for (int i = 0; i < 10; i++) { // try to rename multiple times. + if (hdfs.rename(tempCopyFile, copyFile)) { + renamed = true; + break;// rename was successful + } + Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve + } + if (!renamed) { + hdfs.delete(tempCopyFile, false); + throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile + + " to its final filename"); + } + + changeOwner(context, hdfs, copyFile, flowFile); + } + + getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", + new Object[]{putFlowFile, copyFile, millis, dataRate}); + + final String newFilename = copyFile.getName(); + final String hdfsPath = copyFile.getParent().toString(); + putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename); + putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); + final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); + session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString()); + + session.transfer(putFlowFile, getSuccessRelationship()); + + } catch (final IOException e) { + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().warn("An error occurred while connecting to HDFS. " + + "Rolling back session, and penalizing flow file {}", + new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()}); + session.rollback(true); + } else { + getLogger().error("Failed to access HDFS due to {}", new Object[]{e}); + session.transfer(putFlowFile, getFailureRelationship()); + } + } catch (final Throwable t) { + if (tempDotCopyFile != null) { + try { + hdfs.delete(tempDotCopyFile, false); + } catch (Exception e) { + getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); + } + } + getLogger().error("Failed to write to HDFS due to {}", new Object[]{t}); + session.transfer(session.penalize(putFlowFile), getFailureRelationship()); + context.yield(); + } + + return null; + } + }); + } + protected Relationship getSuccessRelationship() { return REL_SUCCESS; } - @Override protected Relationship getFailureRelationship() { return REL_FAILURE; } - @Override protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) { final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath); } - @Override protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); return bufferSizeProp != null ? bufferSizeProp.intValue() : getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); } - @Override protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) { final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); return replicationProp != null ? replicationProp.shortValue() : getFileSystem() .getDefaultReplication(dirPath); } - @Override protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) { return context.getProperty(IGNORE_LOCALITY).asBoolean(); } - @Override protected String getOwner(final ProcessContext context, final FlowFile flowFile) { final String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue(); return owner == null || owner.isEmpty() ? null : owner; } - @Override protected String getGroup(final ProcessContext context, final FlowFile flowFile) { final String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue(); return group == null || group.isEmpty() ? null : group; } + + /** + * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type, + * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise. + * @param t The throwable to inspect for the cause. + * @return + */ + private Optional findCause(Throwable t, Class expectedCauseType, Predicate causePredicate) { + Stream causalChain = Throwables.getCausalChain(t).stream(); + return causalChain + .filter(expectedCauseType::isInstance) + .map(expectedCauseType::cast) + .filter(causePredicate) + .findFirst(); + } + + protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { + try { + // Change owner and group of file if configured to do so + final String owner = getOwner(context, flowFile); + final String group = getGroup(context, flowFile); + + if (owner != null || group != null) { + hdfs.setOwner(name, owner, group); + } + } catch (Exception e) { + getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e}); + } + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java index 0cd069e1b8..342c71ba85 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java @@ -46,6 +46,7 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import static org.junit.Assert.assertEquals; @@ -124,7 +125,8 @@ public class AbstractHadoopTest { final File brokenCoreSite = new File("src/test/resources/core-site-broken.xml"); final ResourceReference brokenCoreSiteReference = new FileResourceReference(brokenCoreSite); final ResourceReferences references = new StandardResourceReferences(Collections.singletonList(brokenCoreSiteReference)); - processor.resetHDFSResources(references, runner.getProcessContext()); + final List locations = references.asLocations(); + processor.resetHDFSResources(locations, runner.getProcessContext()); Assert.fail("Should have thrown SocketTimeoutException"); } catch (IOException e) { } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java index ab49d638bf..b7920b8fc8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.nifi.components.resource.ResourceReferences; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -35,6 +34,7 @@ import org.mockito.ArgumentCaptor; import java.io.File; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.List; import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; @@ -96,7 +96,7 @@ public class GetHDFSSequenceFileTest { public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile { @Override - HdfsResources resetHDFSResources(ResourceReferences configResources, ProcessContext context) throws IOException { + HdfsResources resetHDFSResources(final List resourceLocations, ProcessContext context) throws IOException { return hdfsResources; }