NIFI-8519 Adding HDFS support for NAR autoload

- Refining classloader management with the help of @markap14

This closes #5059

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bence Simon 2021-05-05 23:34:13 +02:00 committed by Mark Payne
parent 7c08fbc4d4
commit 51aae5bcf6
22 changed files with 1193 additions and 111 deletions

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
/**
* Represents an external source where the NAR files might be acquired from. Used by the NAR auto loader functionality
* in order to poll an external source for new NAR files to load.
*/
public interface NarProvider {
/**
* Initializes the NAR Provider based on the given set of properties.
*/
void initialize(NarProviderInitializationContext context);
/**
* Performs a listing of all NAR's that are available.
*
* @Return The result is a list of locations, where the format depends on the actual implementation.
*/
Collection<String> listNars() throws IOException;
/**
* Fetches the NAR at the given location. The location should be one of the values returned by <code>listNars()</code>.
*/
InputStream fetchNarContents(String location) throws IOException;
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import java.util.Map;
/**
* Contains necessary information for extensions of NAR auto loader functionality.
*/
public interface NarProviderInitializationContext {
/**
* @return Returns with the available properties.
*/
Map<String, String> getProperties();
}

View File

@ -1914,6 +1914,39 @@ public abstract class NiFiProperties {
return Paths.get(getProperty(STATUS_REPOSITORY_QUESTDB_PERSIST_LOCATION, DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_LOCATION));
}
/**
* Returns all properties where the property key starts with the prefix.
*
* @param prefix The exact string the returned properties should start with. Dots are considered, thus prefix "item" will return both
* properties starting with "item." and "items". Properties with empty value will be included as well.
*
* @return A map of properties starting with the prefix.
*/
public Map<String, String> getPropertiesWithPrefix(final String prefix) {
return getPropertyKeys().stream().filter(key -> key.startsWith(prefix)).collect(Collectors.toMap(key -> key, key -> getProperty(key)));
}
/**
* Returns with all the possible next "tokens" after the given prefix. An alphanumeric string between dots is considered as a "token".
*
* For example if there are "parent.sub1" and a "parent.sub2" properties are set, and the prefix is "parent", the method will return
* with a set, consisting of "sub1" and "sub2. Only directly subsequent tokens are considered, so in case of "parent.sub1.subsub1", the
* result will contain "sub1" as well.
*
* @param prefix The prefix of the request.
*
* @return A set of direct subsequent tokens.
*/
public Set<String> getDirectSubsequentTokens(final String prefix) {
final String fixedPrefix = prefix.endsWith(".") ? prefix : prefix + ".";
return getPropertyKeys().stream()
.filter(key -> key.startsWith(fixedPrefix))
.map(key -> key.substring(fixedPrefix.length()))
.map(key -> key.indexOf('.') == -1 ? key : key.substring(0, key.indexOf('.')))
.collect(Collectors.toSet());
}
/**
* Creates an instance of NiFiProperties. This should likely not be called
* by any classes outside of the NiFi framework but can be useful by the

View File

@ -374,4 +374,100 @@ public class NiFiPropertiesTest {
assertFalse(properties.isTlsConfigurationPresent());
}
@Test
public void testGetPropertiesWithPrefixWithoutDot() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Map<String, String> result = testSubject.getPropertiesWithPrefix("nifi.web.http");
// then
Assert.assertEquals(4, result.size());
Assert.assertTrue(result.containsKey("nifi.web.http.host"));
Assert.assertTrue(result.containsKey("nifi.web.https.host"));
}
@Test
public void testGetPropertiesWithPrefixWithDot() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Map<String, String> result = testSubject.getPropertiesWithPrefix("nifi.web.http.");
// then
Assert.assertEquals(2, result.size());
Assert.assertTrue(result.containsKey("nifi.web.http.host"));
Assert.assertFalse(result.containsKey("nifi.web.https.host"));
}
@Test
public void testGetPropertiesWithPrefixWhenNoResult() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Map<String, String> result = testSubject.getPropertiesWithPrefix("invalid.property");
// then
Assert.assertTrue(result.isEmpty());
}
@Test
public void testGetDirectSubsequentTokensWithoutDot() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Set<String> result = testSubject.getDirectSubsequentTokens("nifi.web.http");
// then
Assert.assertEquals(2, result.size());
Assert.assertTrue(result.contains("host"));
Assert.assertTrue(result.contains("port"));
}
@Test
public void testGetDirectSubsequentTokensWithDot() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Set<String> result = testSubject.getDirectSubsequentTokens("nifi.web.http.");
// then
Assert.assertEquals(2, result.size());
Assert.assertTrue(result.contains("host"));
Assert.assertTrue(result.contains("port"));
}
@Test
public void testGetDirectSubsequentTokensWithNonExistingToken() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Set<String> result = testSubject.getDirectSubsequentTokens("lorem.ipsum");
// then
Assert.assertTrue(result.isEmpty());
}
@Test
public void testGetDirectSubsequentTokensWhenMoreTokensAfterward() {
// given
final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null);
// when
final Set<String> result = testSubject.getDirectSubsequentTokens("nifi.web");
// then
Assert.assertEquals(4, result.size());
Assert.assertTrue(result.contains("http"));
Assert.assertTrue(result.contains("https"));
Assert.assertTrue(result.contains("war"));
Assert.assertTrue(result.contains("jetty"));
}
}

View File

@ -4012,3 +4012,42 @@ Now, we must place our custom processor nar in the configured directory. The con
Ensure that the file has appropriate permissions for the nifi user and group.
Refresh the browser page and the custom processor should now be available when adding a new Processor to your flow.
=== NAR Providers
NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using NAR Providers. A NAR Provider serves as a connector between an external data store
and NiFi.
When configured, a NAR Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
the `nifi.nar.library.autoload.directory` for autoloading.
NAR Provider can be configured by adding the `nifi.nar.library.provider.<providerName>.implementation` property with value containing the proper implementation class. Some implementations might need
further properties. These are defined by the implementation and must be prefixed with `nifi.nar.library.provider.<providerName>.`.
The `<providerName>` is arbitrary and serves to correlate multiple properties together for a single provider. Multiple providers might be set, with different `<providerName>`. Currently NiFi supports HDFS based NAR provider.
==== HDFS NAR Provider
This implementation is capable of downloading NAR files from an HDFS file system.
The value of the `nifi.nar.library.provider.<providerName>.implementation` must be `org.apache.nifi.nar.hadoop.HDFSNarProvider`. The following further properties are defined by the provider:
[options="header"]
|===
| Name | Description
| resources | List of HDFS resources, separated by comma.
| source.directory | The source directory of NAR files within HDFS. Note: the provider does not check for files recursively.
| kerberos.principal | Optional. Kerberos principal to authenticate as.
| kerberos.keytab | Optional. Kerberos keytab associated with the principal.
| kerberos.password | Optional. Kerberos password associated with the principal.
|===
Example configuration:
nifi.nar.library.provider.hdfs1.implementation=org.apache.nifi.nar.hadoop.HDFSNarProvider
nifi.nar.library.provider.hdfs1.resources=/etc/hadoop/core-site.xml
nifi.nar.library.provider.hdfs1.source.directory=/customNars
nifi.nar.library.provider.hdfs2.implementation=org.apache.nifi.nar.hadoop.HDFSNarProvider
nifi.nar.library.provider.hdfs2.resources=/etc/hadoop/core-site.xml
nifi.nar.library.provider.hdfs2.source.directory=/other/dir/for/customNars

View File

@ -37,7 +37,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
@ -51,7 +50,6 @@ import javax.net.SocketFactory;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.Socket;
@ -62,8 +60,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@ -592,36 +588,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
return accessDenied;
}
static protected class HdfsResources {
private final Configuration configuration;
private final FileSystem fileSystem;
private final UserGroupInformation userGroupInformation;
private final KerberosUser kerberosUser;
public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation, KerberosUser kerberosUser) {
this.configuration = configuration;
this.fileSystem = fileSystem;
this.userGroupInformation = userGroupInformation;
this.kerberosUser = kerberosUser;
}
public Configuration getConfiguration() {
return configuration;
}
public FileSystem getFileSystem() {
return fileSystem;
}
public UserGroupInformation getUserGroupInformation() {
return userGroupInformation;
}
public KerberosUser getKerberosUser() {
return kerberosUser;
}
}
static protected class ValidationResources {
private final ResourceReferences configResources;
private final Configuration configuration;
@ -640,57 +606,4 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
}
/**
* Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
* adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
* something that was previously not found, but might now be available.
*
* Reference the original getClassByNameOrNull from Configuration.
*/
static class ExtendedConfiguration extends Configuration {
private final ComponentLog logger;
private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>();
public ExtendedConfiguration(final ComponentLog logger) {
this.logger = logger;
}
@Override
public Class<?> getClassByNameOrNull(String name) {
final ClassLoader classLoader = getClassLoader();
Map<String, WeakReference<Class<?>>> map;
synchronized (CACHE_CLASSES) {
map = CACHE_CLASSES.get(classLoader);
if (map == null) {
map = Collections.synchronizedMap(new WeakHashMap<>());
CACHE_CLASSES.put(classLoader, map);
}
}
Class<?> clazz = null;
WeakReference<Class<?>> ref = map.get(name);
if (ref != null) {
clazz = ref.get();
}
if (clazz == null) {
try {
clazz = Class.forName(name, true, classLoader);
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(), e);
return null;
}
// two putters can race here, but they'll put the same class
map.put(name, new WeakReference<>(clazz));
return clazz;
} else {
// cache hit
return clazz;
}
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.logging.ComponentLog;
import org.slf4j.Logger;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.function.BiConsumer;
/**
* Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
* adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
* something that was previously not found, but might now be available.
*
* Reference the original getClassByNameOrNull from Configuration.
*/
public class ExtendedConfiguration extends Configuration {
private final BiConsumer<String, Throwable> loggerMethod;
private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>();
public ExtendedConfiguration(final Logger logger) {
this.loggerMethod = logger::error;
}
public ExtendedConfiguration(final ComponentLog logger) {
this.loggerMethod = logger::error;
}
@Override
public Class<?> getClassByNameOrNull(String name) {
final ClassLoader classLoader = getClassLoader();
Map<String, WeakReference<Class<?>>> map;
synchronized (CACHE_CLASSES) {
map = CACHE_CLASSES.get(classLoader);
if (map == null) {
map = Collections.synchronizedMap(new WeakHashMap<>());
CACHE_CLASSES.put(classLoader, map);
}
}
Class<?> clazz = null;
WeakReference<Class<?>> ref = map.get(name);
if (ref != null) {
clazz = ref.get();
}
if (clazz == null) {
try {
clazz = Class.forName(name, true, classLoader);
} catch (ClassNotFoundException e) {
loggerMethod.accept(e.getMessage(), e);
return null;
}
// two putters can race here, but they'll put the same class
map.put(name, new WeakReference<>(clazz));
return clazz;
} else {
// cache hit
return clazz;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.security.krb.KerberosUser;
public class HdfsResources {
private final Configuration configuration;
private final FileSystem fileSystem;
private final UserGroupInformation userGroupInformation;
private final KerberosUser kerberosUser;
public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation, KerberosUser kerberosUser) {
this.configuration = configuration;
this.fileSystem = fileSystem;
this.userGroupInformation = userGroupInformation;
this.kerberosUser = kerberosUser;
}
public Configuration getConfiguration() {
return configuration;
}
public FileSystem getFileSystem() {
return fileSystem;
}
public UserGroupInformation getUserGroupInformation() {
return userGroupInformation;
}
public KerberosUser getKerberosUser() {
return kerberosUser;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.nar;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,37 +27,44 @@ import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchService;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
/**
* Starts a thread to monitor the auto-load directory for new NARs.
*/
public class NarAutoLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(NarAutoLoader.class);
private static final String NAR_PROVIDER_PREFIX = "nifi.nar.library.provider.";
private static final String IMPLEMENTATION_PROPERTY = "implementation";
private static final long POLL_INTERVAL_MS = 5000;
private final File autoLoadDir;
private final NiFiProperties properties;
private final NarLoader narLoader;
private final ExtensionManager extensionManager;
private volatile Set<NarProviderTask> narProviderTasks;
private volatile NarAutoLoaderTask narAutoLoaderTask;
private volatile boolean started = false;
public NarAutoLoader(final File autoLoadDir, final NarLoader narLoader) {
this.autoLoadDir = Objects.requireNonNull(autoLoadDir);
public NarAutoLoader(final NiFiProperties properties, final NarLoader narLoader, final ExtensionManager extensionManager) {
this.properties = Objects.requireNonNull(properties);
this.narLoader = Objects.requireNonNull(narLoader);
this.extensionManager = Objects.requireNonNull(extensionManager);
}
public synchronized void start() throws IOException {
public synchronized void start() throws IllegalAccessException, InstantiationException, ClassNotFoundException, IOException {
if (started) {
return;
}
final File autoLoadDir = properties.getNarAutoLoadDirectory();
FileUtils.ensureDirectoryExistAndCanRead(autoLoadDir);
final WatchService watcher = FileSystems.getDefault().newWatchService();
final Path autoLoadPath = autoLoadDir.toPath();
autoLoadPath.register(watcher, StandardWatchEventKinds.ENTRY_CREATE);
@ -69,19 +77,43 @@ public class NarAutoLoader {
LOGGER.info("Starting NAR Auto-Loader for directory {} ...", new Object[]{autoLoadPath});
final Thread thread = new Thread(narAutoLoaderTask);
thread.setName("NAR Auto-Loader");
thread.setDaemon(true);
thread.start();
final Thread autoLoaderThread = new Thread(narAutoLoaderTask);
autoLoaderThread.setName("NAR Auto-Loader");
autoLoaderThread.setDaemon(true);
autoLoaderThread.start();
LOGGER.info("NAR Auto-Loader started");
started = true;
narProviderTasks = new HashSet<>();
for (final String externalSourceName : properties.getDirectSubsequentTokens(NAR_PROVIDER_PREFIX)) {
LOGGER.info("NAR Provider {} found in configuration", externalSourceName);
final NarProviderInitializationContext context = new PropertyBasedNarProviderInitializationContext(properties, externalSourceName);
final String implementationClass = properties.getProperty(NAR_PROVIDER_PREFIX + externalSourceName + "." + IMPLEMENTATION_PROPERTY);
final String providerId = UUID.randomUUID().toString();
final NarProvider provider = NarThreadContextClassLoader.createInstance(extensionManager, implementationClass, NarProvider.class, properties, providerId);
provider.initialize(context);
final ClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(providerId);
final ClassLoader providerClassLoader = instanceClassLoader == null ? provider.getClass().getClassLoader() : instanceClassLoader;
final NarProviderTask task = new NarProviderTask(provider, providerClassLoader, properties.getNarAutoLoadDirectory(), POLL_INTERVAL_MS);
narProviderTasks.add(task);
final Thread providerThread = new Thread(task);
providerThread.setName("NAR Provider Task - " + externalSourceName);
providerThread.setDaemon(true);
providerThread.setContextClassLoader(provider.getClass().getClassLoader());
providerThread.start();
}
}
public synchronized void stop() {
started = false;
narAutoLoaderTask.stop();
narAutoLoaderTask = null;
narProviderTasks.forEach(NarProviderTask::stop);
narProviderTasks = null;
LOGGER.info("NAR Auto-Loader stopped");
}
}

View File

@ -172,7 +172,6 @@ public class NarAutoLoaderTask implements Runnable {
public NarAutoLoaderTask build() {
return new NarAutoLoaderTask(this);
}
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
final class NarProviderTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(NarProviderTask.class);
private static final String NAR_EXTENSION = "nar";
// A unique id is necessary for temporary files not to collide with temporary files from other instances.
private final String id = UUID.randomUUID().toString();
private final NarProvider narProvider;
private final ClassLoader narProviderClassLoader;
private final long pollTimeInMs;
private final File extensionDirectory;
private volatile boolean stopped = false;
NarProviderTask(final NarProvider narProvider, final ClassLoader narProviderClassLoader, final File extensionDirectory, final long pollTimeInMs) {
this.narProvider = narProvider;
this.narProviderClassLoader = narProviderClassLoader;
this.pollTimeInMs = pollTimeInMs;
this.extensionDirectory = extensionDirectory;
}
@Override
public void run() {
LOGGER.info("Nar provider task is started");
while (!stopped) {
try {
LOGGER.debug("Task starts fetching NARs from provider");
final Set<String> loadedNars = getLoadedNars();
final Collection<String> availableNars;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narProviderClassLoader)) {
availableNars = narProvider.listNars();
}
for (final String availableNar : availableNars) {
if (!loadedNars.contains(availableNar)) {
final long startedAt = System.currentTimeMillis();
final InputStream inputStream;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narProviderClassLoader)) {
inputStream = narProvider.fetchNarContents(availableNar);
}
final File tempFile = new File(extensionDirectory, ".tmp_" + id + ".nar");
final File targetFile = new File(extensionDirectory, availableNar);
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
tempFile.renameTo(targetFile);
LOGGER.info("Downloaded NAR {} in {} ms", availableNar, (System.currentTimeMillis() - startedAt));
}
}
LOGGER.debug("Task finished fetching NARs from provider");
} catch (final Throwable e) {
LOGGER.error("Error during reaching the external source", e);
}
try {
Thread.sleep(pollTimeInMs);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("NAR autoloader external source task is interrupted");
stopped = true;
}
}
}
private Set<String> getLoadedNars() {
return Arrays.stream(extensionDirectory.listFiles(file -> file.isFile() && file.getName().toLowerCase().endsWith("." + NAR_EXTENSION)))
.map(file -> file.getName())
.collect(Collectors.toSet());
}
void stop() {
LOGGER.info("Nar provider task is stopped");
stopped = true;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import org.apache.nifi.util.NiFiProperties;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* A facade at front of {@code NiFiProperties} for auto loader extensions. Also limits the scope of the reachable properties.
*/
public class PropertyBasedNarProviderInitializationContext implements NarProviderInitializationContext {
private static Set<String> GUARDED_PROPERTIES = new HashSet<>(Arrays.asList("implementation"));
static final String BASIC_PREFIX = "nifi.nar.library.provider.";
private final Map<String, String> properties;
private final String name;
public PropertyBasedNarProviderInitializationContext(final NiFiProperties properties, final String name) {
this.properties = extractProperties(properties, name);
this.name = name;
}
@Override
public Map<String, String> getProperties() {
return properties;
}
public Map<String, String> extractProperties(final NiFiProperties properties, final String name) {
final String prefix = BASIC_PREFIX + name + ".";
final Map<String, String> candidates = properties.getPropertiesWithPrefix(prefix);
final Map<String, String> result = new HashMap<>();
for (final Map.Entry<String, String> entry : candidates.entrySet()) {
final String parameterKey = entry.getKey().substring(prefix.length());
if (!parameterKey.isEmpty() && !GUARDED_PROPERTIES.contains(parameterKey)) {
result.put(parameterKey, entry.getValue());
}
}
return result;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
public class TestPropertyBasedNarProviderInitializationContext {
private static final String PROVIDER_NAME = "external";
private static final String PREFIX = PropertyBasedNarProviderInitializationContext.BASIC_PREFIX + PROVIDER_NAME + ".";
@Mock
NiFiProperties properties;
@Test
public void testEmptyProperties() {
// when
final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME);
final Map<String, String> result = testSubject.getProperties();
// then
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
Assert.assertTrue(result.isEmpty());
}
@Test
public void testGuardedPropertiesAreNotReturned() {
// given
final Map<String, String> availableProperties = new HashMap<>();
availableProperties.put(PREFIX + "implementation", "value");
Mockito.when(properties.getPropertiesWithPrefix(PREFIX)).thenReturn(availableProperties);
// when
final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME);
final Map<String, String> result = testSubject.getProperties();
// then
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
Assert.assertTrue(result.isEmpty());
}
@Test
public void testPropertiesWouldHaveEmptyKeyAreNotReturned() {
// given
final Map<String, String> availableProperties = new HashMap<>();
availableProperties.put(PREFIX, "value");
Mockito.when(properties.getPropertiesWithPrefix(PREFIX)).thenReturn(availableProperties);
// when
final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME);
final Map<String, String> result = testSubject.getProperties();
// then
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
Assert.assertTrue(result.isEmpty());
}
@Test
public void testPrefixIsRemoved() {
// given
final Map<String, String> availableProperties = new HashMap<>();
availableProperties.put(PREFIX + "key1", "value1");
availableProperties.put(PREFIX + "key2", "value2");
Mockito.when(properties.getPropertiesWithPrefix(PREFIX)).thenReturn(availableProperties);
// when
final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME);
final Map<String, String> result = testSubject.getProperties();
// then
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
Assert.assertEquals(2, result.size());
Assert.assertTrue(result.containsKey("key1"));
Assert.assertTrue(result.containsKey("key2"));
Assert.assertEquals("value1", result.get("key1"));
Assert.assertEquals("value2", result.get("key2"));
}
}

View File

@ -62,6 +62,13 @@ public interface ExtensionManager {
*/
InstanceClassLoader removeInstanceClassLoader(String instanceIdentifier);
/**
* Registers the given instance class loader so that it can be later retrieved via {@link #getInstanceClassLoader(String)}
* @param instanceIdentifier the instance identifier
* @param instanceClassLoader the class loader
*/
void registerInstanceClassLoader(String instanceIdentifier, InstanceClassLoader instanceClassLoader);
/**
* Closes the given ClassLoader if it is an instance of URLClassLoader.
*

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.nar;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.authentication.LoginIdentityProvider;
import org.apache.nifi.authorization.AccessPolicyProvider;
import org.apache.nifi.authorization.Authorizer;
@ -37,6 +38,7 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.NiFiProperties;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
@ -44,8 +46,13 @@ import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* THREAD SAFE
@ -175,6 +182,7 @@ public class NarThreadContextClassLoader extends URLClassLoader {
}
}
/**
* Constructs an instance of the given type using either default no args
* constructor or a constructor which takes a NiFiProperties object
@ -190,7 +198,27 @@ public class NarThreadContextClassLoader extends URLClassLoader {
* @throws ClassNotFoundException if the class cannot be found
*/
public static <T> T createInstance(final ExtensionManager extensionManager, final String implementationClassName, final Class<T> typeDefinition, final NiFiProperties nifiProperties)
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
throws InstantiationException, IllegalAccessException, ClassNotFoundException {
return createInstance(extensionManager, implementationClassName, typeDefinition, nifiProperties, UUID.randomUUID().toString());
}
/**
* Constructs an instance of the given type using either default no args
* constructor or a constructor which takes a NiFiProperties object
* (preferred).
*
* @param <T> the type to create an instance for
* @param implementationClassName the implementation class name
* @param typeDefinition the type definition
* @param nifiProperties the NiFiProperties instance
* @param instanceId the UUID of the instance
* @return constructed instance
* @throws InstantiationException if there is an error instantiating the class
* @throws IllegalAccessException if there is an error accessing the type
* @throws ClassNotFoundException if the class cannot be found
*/
public static <T> T createInstance(final ExtensionManager extensionManager, final String implementationClassName, final Class<T> typeDefinition, final NiFiProperties nifiProperties,
final String instanceId) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
final List<Bundle> bundles = extensionManager.getBundles(implementationClassName);
@ -202,11 +230,11 @@ public class NarThreadContextClassLoader extends URLClassLoader {
}
final Bundle bundle = bundles.get(0);
final ClassLoader detectedClassLoaderForType = bundle.getClassLoader();
final Class<?> rawClass = Class.forName(implementationClassName, true, detectedClassLoaderForType);
final ClassLoader instanceClassLoader = createClassLoader(implementationClassName, instanceId, bundle, extensionManager);
final Class<?> instanceClass = Class.forName(implementationClassName, true, instanceClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
final Class<?> desiredClass = rawClass.asSubclass(typeDefinition);
Thread.currentThread().setContextClassLoader(instanceClassLoader);
final Class<?> desiredClass = instanceClass.asSubclass(typeDefinition);
if(nifiProperties == null){
return typeDefinition.cast(desiredClass.newInstance());
}
@ -235,4 +263,47 @@ public class NarThreadContextClassLoader extends URLClassLoader {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
private static ClassLoader createClassLoader(final String implementationClassName, final String instanceId, final Bundle bundle, final ExtensionManager extensionManager)
throws ClassNotFoundException {
final ClassLoader bundleClassLoader = bundle.getClassLoader();
final Class<?> rawClass = Class.forName(implementationClassName, true, bundleClassLoader);
final RequiresInstanceClassLoading instanceClassLoadingAnnotation = rawClass.getAnnotation(RequiresInstanceClassLoading.class);
if (instanceClassLoadingAnnotation == null) {
return bundleClassLoader;
}
final Set<URL> instanceUrls = new LinkedHashSet<>();
final Set<File> narNativeLibDirs = new LinkedHashSet<>();
final NarClassLoader narBundleClassLoader = (NarClassLoader) bundleClassLoader;
narNativeLibDirs.add(narBundleClassLoader.getNARNativeLibDir());
instanceUrls.addAll(Arrays.asList(narBundleClassLoader.getURLs()));
ClassLoader ancestorClassLoader = narBundleClassLoader.getParent();
if (instanceClassLoadingAnnotation.cloneAncestorResources()) {
while (ancestorClassLoader instanceof NarClassLoader) {
final Bundle ancestorNarBundle = extensionManager.getBundle(ancestorClassLoader);
// stop including ancestor resources when we reach one of the APIs, or when we hit the Jetty NAR
if (ancestorNarBundle == null || ancestorNarBundle.getBundleDetails().getCoordinate().getId().equals(NarClassLoaders.JETTY_NAR_ID)) {
break;
}
final NarClassLoader ancestorNarClassLoader = (NarClassLoader) ancestorClassLoader;
narNativeLibDirs.add(ancestorNarClassLoader.getNARNativeLibDir());
Collections.addAll(instanceUrls, ancestorNarClassLoader.getURLs());
ancestorClassLoader = ancestorNarClassLoader.getParent();
}
}
final InstanceClassLoader instanceClassLoader = new InstanceClassLoader(instanceId, implementationClassName, instanceUrls,
Collections.emptySet(), narNativeLibDirs, ancestorClassLoader);
extensionManager.registerInstanceClassLoader(instanceId, instanceClassLoader);
return instanceClassLoader;
}
}

View File

@ -101,6 +101,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
definitionMap.put(ContentRepository.class, new HashSet<>());
definitionMap.put(StateProvider.class, new HashSet<>());
definitionMap.put(StatusAnalyticsModel.class, new HashSet<>());
definitionMap.put(NarProvider.class, new HashSet<>());
}
@Override
@ -462,6 +463,11 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
return classLoader;
}
@Override
public void registerInstanceClassLoader(final String instanceIdentifier, final InstanceClassLoader instanceClassLoader) {
instanceClassloaderLookup.putIfAbsent(instanceIdentifier, instanceClassLoader);
}
@Override
public void closeURLClassLoader(final String instanceIdentifier, final ClassLoader classLoader) {
if ((classLoader instanceof URLClassLoader)) {

View File

@ -1225,7 +1225,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
extensionMapping,
this);
narAutoLoader = new NarAutoLoader(props.getNarAutoLoadDirectory(), narLoader);
narAutoLoader = new NarAutoLoader(props, narLoader, extensionManager);
narAutoLoader.start();
// dump the application url after confirming everything started successfully

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar.hadoop;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.nar.NarProvider;
import org.apache.nifi.nar.NarProviderInitializationContext;
import org.apache.nifi.nar.hadoop.util.ExtensionFilter;
import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
import org.apache.nifi.processors.hadoop.HdfsResources;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public class HDFSNarProvider implements NarProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(HDFSNarProvider.class);
private static final String RESOURCES_PARAMETER = "resources";
private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory";
private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal";
private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab";
private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password";
private static final String NAR_EXTENSION = "nar";
private static final String DELIMITER = "/";
private static final int BUFFER_SIZE_DEFAULT = 4096;
private static final Object RESOURCES_LOCK = new Object();
private volatile List<String> resources = null;
private volatile Path sourceDirectory = null;
private volatile NarProviderInitializationContext context;
private volatile boolean initialized = false;
public void initialize(final NarProviderInitializationContext context) {
resources = Arrays.stream(Objects.requireNonNull(
context.getProperties().get(RESOURCES_PARAMETER)).split(",")).map(s -> s.trim()).filter(s -> !s.isEmpty()).collect(Collectors.toList());
if (resources.isEmpty()) {
throw new IllegalArgumentException("At least one HDFS configuration resource is necessary");
}
final String sourceDirectory = context.getProperties().get(SOURCE_DIRECTORY_PARAMETER);
if (sourceDirectory == null || sourceDirectory.isEmpty()) {
throw new IllegalArgumentException("Provider needs the source directory to be set");
}
this.sourceDirectory = new Path(sourceDirectory);
this.context = context;
this.initialized = true;
}
@Override
public Collection<String> listNars() throws IOException {
if (!initialized) {
LOGGER.error("Provider is not initialized");
}
final HdfsResources hdfsResources = getHdfsResources();
try {
final FileStatus[] fileStatuses = hdfsResources.getUserGroupInformation()
.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfsResources.getFileSystem().listStatus(sourceDirectory, new ExtensionFilter(NAR_EXTENSION)));
final List<String> result = Arrays.stream(fileStatuses)
.filter(fileStatus -> fileStatus.isFile())
.map(fileStatus -> fileStatus.getPath().getName())
.collect(Collectors.toList());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The following NARs were found: " + String.join(", ", result));
}
return result;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Provider cannot list NARs", e);
}
}
@Override
public InputStream fetchNarContents(final String location) throws IOException {
if (!initialized) {
LOGGER.error("Provider is not initialized");
}
final Path path = getNarLocation(location);
final HdfsResources hdfsResources = getHdfsResources();
try {
if (hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> !hdfsResources.getFileSystem().exists(path))) {
throw new IOException("Provider cannot find " + location);
}
return hdfsResources.getUserGroupInformation()
.doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Error during acquiring file", e);
}
}
private Path getNarLocation(final String location) {
String result = sourceDirectory.toString();
if (!result.endsWith(DELIMITER)) {
result += DELIMITER;
}
return new Path(result + location);
}
private HdfsResources getHdfsResources() throws IOException {
final Configuration config = new ExtendedConfiguration(LOGGER);
config.setClassLoader(Thread.currentThread().getContextClassLoader());
for (final String resource : resources) {
config.addResource(new Path(resource));
}
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete restart
final String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs;
UserGroupInformation ugi;
KerberosUser kerberosUser;
synchronized (RESOURCES_LOCK) {
if (SecurityUtil.isSecurityEnabled(config)) {
final String principal = context.getProperties().get(KERBEROS_PRINCIPAL_PARAMETER);
final String keyTab = context.getProperties().get(KERBEROS_KEYTAB_PARAMETER);
final String password = context.getProperties().get(KERBEROS_PASSWORD_PARAMETER);
if (keyTab != null) {
kerberosUser = new KerberosKeytabUser(principal, keyTab);
} else if (password != null) {
kerberosUser = new KerberosPasswordUser(principal, password);
} else {
throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
ugi = SecurityUtil.getUgiForKerberosUser(config, kerberosUser);
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
ugi = SecurityUtil.loginSimple(config);
kerberosUser = null;
}
fs = getFileSystemAsUser(config, ugi);
}
LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
final Path workingDir = fs.getWorkingDirectory();
LOGGER.debug("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
if (!fs.exists(sourceDirectory)) {
throw new IllegalArgumentException("Source directory is not existing");
}
return new HdfsResources(config, fs, ugi, kerberosUser);
}
private void checkHdfsUriForTimeout(final Configuration config) throws IOException {
final URI hdfsUri = FileSystem.getDefaultUri(config);
final String address = hdfsUri.getAuthority();
final int port = hdfsUri.getPort();
if (address == null || address.isEmpty() || port < 0) {
return;
}
final InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
final SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
Socket socket = null;
try {
socket = socketFactory.createSocket();
NetUtils.connect(socket, namenode, 1000); // 1 second timeout
} finally {
IOUtils.closeQuietly(socket);
}
}
private FileSystem getFileSystemAsUser(final Configuration config, final UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
} catch (final InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar.hadoop.util;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
* HDFS listing filter which selects files based on extension.
*/
public class ExtensionFilter implements PathFilter {
private final String extension;
public ExtensionFilter(final String extension) {
this.extension = extension;
}
@Override
public boolean accept(final Path path) {
final String fileName = path.getName().toLowerCase();
return fileName.endsWith("." + extension);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.nar.hadoop.HDFSNarProvider

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar.hadoop.util;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
public class ExtensionFilterTest {
@Test
public void testValid() {
// given
final ExtensionFilter testSubject = new ExtensionFilter("txt");
final Path path = new Path("test.txt");
// when
final boolean result = testSubject.accept(path);
// then
Assert.assertTrue(result);
}
@Test
public void testValidWhenUppercase() {
// given
final ExtensionFilter testSubject = new ExtensionFilter("txt");
final Path path = new Path("test.TXT");
// when
final boolean result = testSubject.accept(path);
// then
Assert.assertTrue(result);
}
@Test
public void testInvalidWhenDifferentExtension() {
// given
final ExtensionFilter testSubject = new ExtensionFilter("txt");
final Path path = new Path("test.json");
// when
final boolean result = testSubject.accept(path);
// then
Assert.assertFalse(result);
}
@Test
public void testInvalidWhenMistypedExtension() {
// given
final ExtensionFilter testSubject = new ExtensionFilter("txt");
final Path path = new Path("test.ttxt");
// when
final boolean result = testSubject.accept(path);
// then
Assert.assertFalse(result);
}
@Test
public void testInvalidWhenMultipleExtension() {
// given
final ExtensionFilter testSubject = new ExtensionFilter("txt");
final Path path = new Path("test.txt.json");
// when
final boolean result = testSubject.accept(path);
// then
Assert.assertFalse(result);
}
@Test
public void testFolder() {
// given
final ExtensionFilter testSubject = new ExtensionFilter("ttxt");
final Path path = new Path("testtxt");
// when
final boolean result = testSubject.accept(path);
// then
Assert.assertFalse(result);
}
}

View File

@ -43,7 +43,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class GetHDFSSequenceFileTest {
private AbstractHadoopProcessor.HdfsResources hdfsResources;
private HdfsResources hdfsResources;
private GetHDFSSequenceFile getHDFSSequenceFile;
private Configuration configuration;
private FileSystem fileSystem;
@ -55,7 +55,7 @@ public class GetHDFSSequenceFileTest {
configuration = mock(Configuration.class);
fileSystem = mock(FileSystem.class);
userGroupInformation = mock(UserGroupInformation.class);
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation, null);
hdfsResources = new HdfsResources(configuration, fileSystem, userGroupInformation, null);
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
reloginTried = false;
@ -86,7 +86,7 @@ public class GetHDFSSequenceFileTest {
@Test
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null, null);
hdfsResources = new HdfsResources(configuration, fileSystem, null, null);
init();
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);