NIFI-2435 - Wrapping reader call in doAs if necessary

This closes #780.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Bryan Rosander 2016-08-03 14:02:52 -04:00 committed by Bryan Bende
parent 321a2398ba
commit 0d730c5cc1
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 164 additions and 8 deletions

View File

@ -413,13 +413,20 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
protected FileSystem getFileSystem() { protected FileSystem getFileSystem() {
// if kerberos is enabled, check if the ticket should be renewed before returning the FS // trigger Relogin if necessary
if (hdfsResources.get().getUserGroupInformation() != null && isTicketOld()) { getUserGroupInformation();
tryKerberosRelogin(hdfsResources.get().getUserGroupInformation());
}
return hdfsResources.get().getFileSystem(); return hdfsResources.get().getFileSystem();
} }
protected UserGroupInformation getUserGroupInformation() {
// if kerberos is enabled, check if the ticket should be renewed before returning
UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation();
if (userGroupInformation != null && isTicketOld()) {
tryKerberosRelogin(userGroupInformation);
}
return userGroupInformation;
}
protected void tryKerberosRelogin(UserGroupInformation ugi) { protected void tryKerberosRelogin(UserGroupInformation ugi) {
try { try {
getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " + getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
@ -32,6 +33,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -102,7 +104,7 @@ public class GetHDFSSequenceFile extends GetHDFS {
continue; // If file is no longer here move on. continue; // If file is no longer here move on.
} }
logger.debug("Reading file"); logger.debug("Reading file");
flowFiles = reader.readSequenceFile(file, conf, hdfs); flowFiles = getFlowFiles(conf, hdfs, reader, file);
if (!keepSourceFiles && !hdfs.delete(file, false)) { if (!keepSourceFiles && !hdfs.delete(file, false)) {
logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over..."); logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over...");
} }
@ -127,7 +129,20 @@ public class GetHDFSSequenceFile extends GetHDFS {
} }
} }
} }
} }
protected Set<FlowFile> getFlowFiles(final Configuration conf, final FileSystem hdfs, final SequenceFileReader<Set<FlowFile>> reader, final Path file) throws Exception {
PrivilegedExceptionAction<Set<FlowFile>> privilegedExceptionAction = new PrivilegedExceptionAction<Set<FlowFile>>() {
@Override
public Set<FlowFile> run() throws Exception {
return reader.readSequenceFile(file, conf, hdfs);
}
};
UserGroupInformation userGroupInformation = getUserGroupInformation();
if (userGroupInformation == null) {
return privilegedExceptionAction.run();
} else {
return userGroupInformation.doAs(privilegedExceptionAction);
}
}
} }

View File

@ -63,7 +63,7 @@ public class KeyValueReader implements SequenceFileReader<Set<FlowFile>> {
final SequenceFile.Reader reader; final SequenceFile.Reader reader;
Set<FlowFile> flowFiles = new HashSet<>(); Set<FlowFile> flowFiles = new HashSet<>();
reader = new SequenceFile.Reader(fileSystem, file, configuration); reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final Text key = new Text(); final Text key = new Text();
final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader); final KeyValueWriterCallback callback = new KeyValueWriterCallback(reader);
final String inputfileName = file.getName() + "." + System.nanoTime() + "."; final String inputfileName = file.getName() + "." + System.nanoTime() + ".";

View File

@ -61,7 +61,7 @@ public class ValueReader implements SequenceFileReader<Set<FlowFile>> {
public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException { public Set<FlowFile> readSequenceFile(final Path file, Configuration configuration, FileSystem fileSystem) throws IOException {
Set<FlowFile> flowFiles = new HashSet<>(); Set<FlowFile> flowFiles = new HashSet<>();
final SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, file, configuration); final SequenceFile.Reader reader = new SequenceFile.Reader(configuration, Reader.file(fileSystem.makeQualified(file)));
final String inputfileName = file.getName() + "." + System.nanoTime() + "."; final String inputfileName = file.getName() + "." + System.nanoTime() + ".";
int counter = 0; int counter = 0;
LOG.debug("Reading from sequence file {}", new Object[]{file}); LOG.debug("Reading from sequence file {}", new Object[]{file});

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class GetHDFSSequenceFileTest {
private AbstractHadoopProcessor.HdfsResources hdfsResources;
private GetHDFSSequenceFile getHDFSSequenceFile;
private Configuration configuration;
private FileSystem fileSystem;
private UserGroupInformation userGroupInformation;
private boolean isTicketOld;
private boolean reloginTried;
@Before
public void setup() throws IOException {
configuration = mock(Configuration.class);
fileSystem = mock(FileSystem.class);
userGroupInformation = mock(UserGroupInformation.class);
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation);
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
isTicketOld = false;
reloginTried = false;
init();
}
private void init() throws IOException {
ProcessContext context = mock(ProcessContext.class);
when(context.getProperty(AbstractHadoopProcessor.KERBEROS_RELOGIN_PERIOD)).thenReturn(mock(PropertyValue.class));
when(context.getProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES)).thenReturn(mock(PropertyValue.class));
when(context.getProperty(AbstractHadoopProcessor.DIRECTORY_PROP_NAME)).thenReturn(mock(PropertyValue.class));
getHDFSSequenceFile.init(mock(ProcessorInitializationContext.class));
getHDFSSequenceFile.onScheduled(context);
}
private void getFlowFilesWithUgi() throws Exception {
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file);
ArgumentCaptor<PrivilegedExceptionAction> privilegedExceptionActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedExceptionAction.class);
verifyNoMoreInteractions(reader);
verify(userGroupInformation).doAs(privilegedExceptionActionArgumentCaptor.capture());
privilegedExceptionActionArgumentCaptor.getValue().run();
verify(reader).readSequenceFile(file, configuration, fileSystem);
}
@Test
public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception {
getFlowFilesWithUgi();
assertFalse(reloginTried);
}
@Test
public void getFlowFilesWithUgiAndOldTicketShouldCallDoAsAndRelogin() throws Exception {
isTicketOld = true;
getFlowFilesWithUgi();
assertTrue(reloginTried);
}
@Test
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null);
init();
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file);
verify(reader).readSequenceFile(file, configuration, fileSystem);
}
public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
@Override
HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException {
return hdfsResources;
}
@Override
public void onScheduled(ProcessContext context) throws IOException {
abstractOnScheduled(context);
}
@Override
protected KerberosProperties getKerberosProperties() {
return kerberosProperties;
}
@Override
protected boolean isTicketOld() {
return isTicketOld;
}
@Override
protected void tryKerberosRelogin(UserGroupInformation ugi) {
reloginTried = true;
}
}
}