diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 9efc0f6ea5..f96aa78f48 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -413,13 +413,20 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } protected FileSystem getFileSystem() { - // if kerberos is enabled, check if the ticket should be renewed before returning the FS - if (hdfsResources.get().getUserGroupInformation() != null && isTicketOld()) { - tryKerberosRelogin(hdfsResources.get().getUserGroupInformation()); - } + // trigger Relogin if necessary + getUserGroupInformation(); return hdfsResources.get().getFileSystem(); } + protected UserGroupInformation getUserGroupInformation() { + // if kerberos is enabled, check if the ticket should be renewed before returning + UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation(); + if (userGroupInformation != null && isTicketOld()) { + tryKerberosRelogin(userGroupInformation); + } + return userGroupInformation; + } + protected void tryKerberosRelogin(UserGroupInformation ugi) { try { getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " + diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index ccd5501b0b..b7376cdd2c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -32,6 +33,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.StopWatch; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -102,7 +104,7 @@ public class GetHDFSSequenceFile extends GetHDFS { continue; // If file is no longer here move on. } logger.debug("Reading file"); - flowFiles = reader.readSequenceFile(file, conf, hdfs); + flowFiles = getFlowFiles(conf, hdfs, reader, file); if (!keepSourceFiles && !hdfs.delete(file, false)) { logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over..."); } @@ -127,7 +129,20 @@ public class GetHDFSSequenceFile extends GetHDFS { } } } - } + protected Set getFlowFiles(final Configuration conf, final FileSystem hdfs, final SequenceFileReader> reader, final Path file) throws Exception { + PrivilegedExceptionAction> privilegedExceptionAction = new PrivilegedExceptionAction>() { + @Override + public Set run() throws Exception { + return reader.readSequenceFile(file, conf, hdfs); + } + }; + UserGroupInformation userGroupInformation = getUserGroupInformation(); + if (userGroupInformation == null) { + return privilegedExceptionAction.run(); + } else { + return userGroupInformation.doAs(privilegedExceptionAction); + } + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java index 2aa77e1b6c..a8c755195b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java @@ -63,7 +63,7 @@ public class KeyValueReader implements SequenceFileReader> { final SequenceFile.Reader reader; Set flowFiles = new HashSet<>(); - reader = new SequenceFile.Reader(fileSystem, file, configuration); + reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file))); final Text key = new Text(); final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader); final String inputfileName = file.getName() + "." + System.nanoTime() + "."; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java index 0ef103d421..7d1e5d0745 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java @@ -61,7 +61,7 @@ public class ValueReader implements SequenceFileReader> { public Set readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException { Set flowFiles = new HashSet<>(); - final SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, file, configuration); + final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file))); final String inputfileName = file.getName() + "." + System.nanoTime() + "."; int counter = 0; LOG.debug("Reading from sequence file {}", new Object[]{file}); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java new file mode 100644 index 0000000000..2ae99f0f4e --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processors.hadoop.util.SequenceFileReader; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class GetHDFSSequenceFileTest { + private AbstractHadoopProcessor.HdfsResources hdfsResources; + private GetHDFSSequenceFile getHDFSSequenceFile; + private Configuration configuration; + private FileSystem fileSystem; + private UserGroupInformation userGroupInformation; + private boolean isTicketOld; + private boolean reloginTried; + + @Before + public void setup() throws IOException { + configuration = mock(Configuration.class); + fileSystem = mock(FileSystem.class); + userGroupInformation = mock(UserGroupInformation.class); + hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation); + getHDFSSequenceFile = new TestableGetHDFSSequenceFile(); + getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class); + isTicketOld = false; + reloginTried = false; + init(); + } + + private void init() throws IOException { + ProcessContext context = mock(ProcessContext.class); + when(context.getProperty(AbstractHadoopProcessor.KERBEROS_RELOGIN_PERIOD)).thenReturn(mock(PropertyValue.class)); + when(context.getProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES)).thenReturn(mock(PropertyValue.class)); + when(context.getProperty(AbstractHadoopProcessor.DIRECTORY_PROP_NAME)).thenReturn(mock(PropertyValue.class)); + getHDFSSequenceFile.init(mock(ProcessorInitializationContext.class)); + getHDFSSequenceFile.onScheduled(context); + } + + private void getFlowFilesWithUgi() throws Exception { + SequenceFileReader reader = mock(SequenceFileReader.class); + Path file = mock(Path.class); + getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file); + ArgumentCaptor privilegedExceptionActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedExceptionAction.class); + verifyNoMoreInteractions(reader); + verify(userGroupInformation).doAs(privilegedExceptionActionArgumentCaptor.capture()); + privilegedExceptionActionArgumentCaptor.getValue().run(); + verify(reader).readSequenceFile(file, configuration, fileSystem); + } + + @Test + public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception { + getFlowFilesWithUgi(); + assertFalse(reloginTried); + } + + @Test + public void getFlowFilesWithUgiAndOldTicketShouldCallDoAsAndRelogin() throws Exception { + isTicketOld = true; + getFlowFilesWithUgi(); + assertTrue(reloginTried); + } + + @Test + public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception { + hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null); + init(); + SequenceFileReader reader = mock(SequenceFileReader.class); + Path file = mock(Path.class); + getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file); + verify(reader).readSequenceFile(file, configuration, fileSystem); + } + + public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile { + @Override + HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { + return hdfsResources; + } + + @Override + public void onScheduled(ProcessContext context) throws IOException { + abstractOnScheduled(context); + } + + @Override + protected KerberosProperties getKerberosProperties() { + return kerberosProperties; + } + + @Override + protected boolean isTicketOld() { + return isTicketOld; + } + + @Override + protected void tryKerberosRelogin(UserGroupInformation ugi) { + reloginTried = true; + } + } +}