NIFI-10166 Improved MiNiFi bootstrap test coverage

This closes #6160

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Ferenc Erdei 2022-06-24 16:30:02 +02:00 committed by exceptionfactory
parent ea0e1b446e
commit d78c667c19
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
42 changed files with 1750 additions and 360 deletions

View File

@ -42,6 +42,8 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.apache.nifi.minifi.bootstrap.service.ReloadService;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -96,20 +98,22 @@ public class RunMiNiFi implements ConfigurationFileHolder {
Optional.ofNullable(properties.getProperty(STATUS_FILE_PID_KEY)).map(Integer::parseInt).orElse(UNINITIALIZED),
properties.getProperty(STATUS_FILE_SECRET_KEY)
);
ProcessUtils processUtils = new UnixProcessUtils();
MiNiFiCommandSender miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, getObjectMapper());
MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender);
MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender, processUtils);
periodicStatusReporterManager =
new PeriodicStatusReporterManager(bootstrapFileProvider.getBootstrapProperties(), miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider.getBootstrapProperties(), this,
singleton(new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider)));
CurrentPortProvider currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters);
CurrentPortProvider currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);
GracefulShutdownParameterProvider gracefulShutdownParameterProvider = new GracefulShutdownParameterProvider(bootstrapFileProvider);
reloadService = new ReloadService(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, this);
reloadService = new ReloadService(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, this, processUtils);
commandRunnerFactory = new CommandRunnerFactory(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager,
bootstrapFileProvider, new MiNiFiStdLogHandler(), bootstrapConfigFile, this, gracefulShutdownParameterProvider,
new MiNiFiExecCommandProvider(bootstrapFileProvider));
new MiNiFiExecCommandProvider(bootstrapFileProvider), processUtils);
}
public int run(BootstrapCommand command, String... args) {

View File

@ -31,6 +31,7 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class CommandRunnerFactory {
@ -45,11 +46,12 @@ public class CommandRunnerFactory {
private final RunMiNiFi runMiNiFi;
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
private final ProcessUtils processUtils;
public CommandRunnerFactory(MiNiFiCommandSender miNiFiCommandSender, CurrentPortProvider currentPortProvider, MiNiFiParameters miNiFiParameters,
MiNiFiStatusProvider miNiFiStatusProvider, PeriodicStatusReporterManager periodicStatusReporterManager,
BootstrapFileProvider bootstrapFileProvider, MiNiFiStdLogHandler miNiFiStdLogHandler, File bootstrapConfigFile, RunMiNiFi runMiNiFi,
GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider) {
GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.miNiFiParameters = miNiFiParameters;
@ -61,6 +63,7 @@ public class CommandRunnerFactory {
this.runMiNiFi = runMiNiFi;
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
this.processUtils = processUtils;
}
/**
@ -74,10 +77,10 @@ public class CommandRunnerFactory {
case START:
case RUN:
commandRunner = new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider);
bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils);
break;
case STOP:
commandRunner = new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider);
commandRunner = new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils);
break;
case STATUS:
commandRunner = new StatusRunner(miNiFiParameters, miNiFiStatusProvider);
@ -102,9 +105,9 @@ public class CommandRunnerFactory {
private List<CommandRunner> getRestartServices() {
List<CommandRunner> compositeList = new LinkedList<>();
compositeList.add(new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider));
compositeList.add(new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils));
compositeList.add(new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider));
bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils));
return compositeList;
}
}

View File

@ -32,7 +32,7 @@ import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
public class DumpRunner implements CommandRunner {
private static final String DUMP_CMD = "DUMP";
protected static final String DUMP_CMD = "DUMP";
private final MiNiFiCommandSender miNiFiCommandSender;
private final CurrentPortProvider currentPortProvider;

View File

@ -28,7 +28,7 @@ import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
public class EnvRunner implements CommandRunner {
private static final String ENV_CMD = "ENV";
protected static final String ENV_CMD = "ENV";
private final MiNiFiCommandSender miNiFiCommandSender;
private final CurrentPortProvider currentPortProvider;

View File

@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.nifi.bootstrap.util.OSUtils;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
@ -55,7 +54,7 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiListener;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.apache.nifi.util.Tuple;
public class StartRunner implements CommandRunner {
@ -72,10 +71,11 @@ public class StartRunner implements CommandRunner {
private final RunMiNiFi runMiNiFi;
private volatile ShutdownHook shutdownHook;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
private final ProcessUtils processUtils;
public StartRunner(CurrentPortProvider currentPortProvider, BootstrapFileProvider bootstrapFileProvider,
PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters, File bootstrapConfigFile,
RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider) {
RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
this.currentPortProvider = currentPortProvider;
this.bootstrapFileProvider = bootstrapFileProvider;
this.periodicStatusReporterManager = periodicStatusReporterManager;
@ -84,6 +84,7 @@ public class StartRunner implements CommandRunner {
this.bootstrapConfigFile = bootstrapConfigFile;
this.runMiNiFi = runMiNiFi;
this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
this.processUtils = processUtils;
}
/**
@ -124,7 +125,7 @@ public class StartRunner implements CommandRunner {
try {
while (true) {
if (UnixProcessUtils.isAlive(process)) {
if (process.isAlive()) {
handleReload();
} else {
Runtime runtime = Runtime.getRuntime();
@ -142,7 +143,7 @@ public class StartRunner implements CommandRunner {
continue;
}
process = restartNifi(bootstrapProperties, confDir, builder, runtime);
process = restartNifi(bootstrapProperties, confDir, builder);
// failed to start process
if (process == null) {
return;
@ -159,7 +160,7 @@ public class StartRunner implements CommandRunner {
}
}
private Process restartNifi(Properties bootstrapProperties, String confDir, ProcessBuilder builder, Runtime runtime) throws IOException {
private Process restartNifi(Properties bootstrapProperties, String confDir, ProcessBuilder builder) throws IOException {
Process process;
boolean previouslyStarted = runMiNiFi.isNiFiStarted();
if (!previouslyStarted) {
@ -238,6 +239,7 @@ public class StartRunner implements CommandRunner {
runMiNiFi.setReloading(false);
}
} catch (InterruptedException ie) {
DEFAULT_LOGGER.warn("Thread interrupted while handling reload");
}
}
@ -270,7 +272,7 @@ public class StartRunner implements CommandRunner {
CMD_LOGGER.info("Starting Apache MiNiFi...");
CMD_LOGGER.info("Working Directory: {}", workingDir.getAbsolutePath());
CMD_LOGGER.info("Command: {}", cmd.stream().collect(Collectors.joining(" ")));
CMD_LOGGER.info("Command: {}", String.join(" ", cmd));
return new Tuple<>(builder, startMiNiFiProcess(builder));
}
@ -296,10 +298,9 @@ public class StartRunner implements CommandRunner {
File bootstrapConfigAbsoluteFile = bootstrapConfigFile.getAbsoluteFile();
File binDir = bootstrapConfigAbsoluteFile.getParentFile();
File workingDir = Optional.ofNullable(props.getProperty("working.dir"))
return Optional.ofNullable(props.getProperty("working.dir"))
.map(File::new)
.orElse(binDir.getParentFile());
return workingDir;
}
private boolean waitForStart() {

View File

@ -32,24 +32,26 @@ import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class StopRunner implements CommandRunner {
private static final String SHUTDOWN_CMD = "SHUTDOWN";
protected static final String SHUTDOWN_CMD = "SHUTDOWN";
private final BootstrapFileProvider bootstrapFileProvider;
private final MiNiFiParameters miNiFiParameters;
private final MiNiFiCommandSender miNiFiCommandSender;
private final CurrentPortProvider currentPortProvider;
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
private final ProcessUtils processUtils;
public StopRunner(BootstrapFileProvider bootstrapFileProvider, MiNiFiParameters miNiFiParameters, MiNiFiCommandSender miNiFiCommandSender,
CurrentPortProvider currentPortProvider, GracefulShutdownParameterProvider gracefulShutdownParameterProvider) {
CurrentPortProvider currentPortProvider, GracefulShutdownParameterProvider gracefulShutdownParameterProvider, ProcessUtils processUtils) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.miNiFiParameters = miNiFiParameters;
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
this.processUtils = processUtils;
}
/**
@ -81,40 +83,18 @@ public class StopRunner implements CommandRunner {
lockFile.createNewFile();
}
File statusFile = bootstrapFileProvider.getStatusFile();
File pidFile = bootstrapFileProvider.getPidFile();
long minifiPid = miNiFiParameters.getMinifiPid();
try {
Optional<String> commandResponse = miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, currentPort);
if (commandResponse.filter(SHUTDOWN_CMD::equals).isPresent()) {
CMD_LOGGER.info("Apache MiNiFi has accepted the Shutdown Command and is shutting down now");
if (minifiPid != UNINITIALIZED) {
UnixProcessUtils.gracefulShutDownMiNiFiProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds. Killing process.",
gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
if (statusFile.exists() && !statusFile.delete()) {
CMD_LOGGER.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}
if (pidFile.exists() && !pidFile.delete()) {
CMD_LOGGER.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
}
CMD_LOGGER.info("MiNiFi has finished shutting down.");
}
gracefulShutDownMiNiFiProcess(minifiPid);
} else {
CMD_LOGGER.error("When sending SHUTDOWN command to MiNiFi, got unexpected response {}", commandResponse.orElse(null));
status = ERROR.getStatusCode();
}
} catch (IOException e) {
if (minifiPid == UNINITIALIZED) {
DEFAULT_LOGGER.error("No PID found for the MiNiFi process, so unable to kill process; The process should be killed manually.");
} else {
DEFAULT_LOGGER.error("Will kill the MiNiFi Process with PID {}", minifiPid);
UnixProcessUtils.killProcessTree(minifiPid);
}
killProcessTree(minifiPid);
} finally {
if (lockFile.exists() && !lockFile.delete()) {
CMD_LOGGER.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
@ -123,4 +103,34 @@ public class StopRunner implements CommandRunner {
return status;
}
private void gracefulShutDownMiNiFiProcess(long minifiPid) throws IOException {
CMD_LOGGER.info("Apache MiNiFi has accepted the Shutdown Command and is shutting down now");
File statusFile = bootstrapFileProvider.getStatusFile();
File pidFile = bootstrapFileProvider.getPidFile();
if (minifiPid != UNINITIALIZED) {
processUtils.shutdownProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds. Killing process.",
gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
if (statusFile.exists() && !statusFile.delete()) {
CMD_LOGGER.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}
if (pidFile.exists() && !pidFile.delete()) {
CMD_LOGGER.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
}
CMD_LOGGER.info("MiNiFi has finished shutting down.");
}
}
private void killProcessTree(long minifiPid) throws IOException {
if (minifiPid == UNINITIALIZED) {
DEFAULT_LOGGER.error("No PID found for the MiNiFi process, so unable to kill process; The process should be killed manually.");
} else {
DEFAULT_LOGGER.error("Will kill the MiNiFi Process with PID {}", minifiPid);
processUtils.killProcessTree(minifiPid);
}
}
}

View File

@ -17,17 +17,16 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
import org.slf4j.Logger;
public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor {
@ -38,12 +37,14 @@ public abstract class AbstractPullChangeIngestor implements Runnable, ChangeInge
protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
protected volatile ConfigurationFileHolder configurationFileHolder;
protected final AtomicReference<Properties> properties = new AtomicReference<>();
@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
this.configurationChangeNotifier = configurationChangeNotifier;
this.properties.set(properties);
this.configurationFileHolder = configurationFileHolder;
}
@Override

View File

@ -23,7 +23,6 @@ import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.Who
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Path;
@ -40,13 +39,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,11 +56,11 @@ import org.slf4j.LoggerFactory;
*/
public class FileChangeIngestor implements Runnable, ChangeIngestor {
private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
static {
HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@ -80,9 +79,10 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
private Path configFilePath;
private WatchService watchService;
private long pollingSeconds;
private volatile Differentiator<InputStream> differentiator;
private volatile Differentiator<ByteBuffer> differentiator;
private volatile ConfigurationChangeNotifier configurationChangeNotifier;
private volatile ConfigurationFileHolder configurationFileHolder;
private volatile Properties properties;
private ScheduledExecutorService executorService;
protected static WatchService initializeWatcher(Path filePath) {
@ -124,20 +124,13 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
logger.debug("Checking for a change");
if (targetChanged()) {
logger.debug("Target changed, checking if it's different than current flow.");
try (FileInputStream configFile = new FileInputStream(configFilePath.toFile());
ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
TeeInputStream teeInputStream = new TeeInputStream(configFile, pipedOutputStream)) {
try (FileInputStream configFile = new FileInputStream(configFilePath.toFile())) {
ByteBuffer readOnlyNewConfig =
ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
IOUtils.toByteArray(configFile), configurationFileHolder.getConfigFileReference().get().duplicate(), properties);
if (differentiator.isNew(teeInputStream)) {
if (differentiator.isNew(readOnlyNewConfig)) {
logger.debug("New change, notifying listener");
// Fill the byteArrayOutputStream with the rest of the request data
while (teeInputStream.available() != 0) {
teeInputStream.read();
}
ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
logger.debug("Listeners notified");
}
@ -149,6 +142,8 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
this.properties = properties;
this.configurationFileHolder = configurationFileHolder;
final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
@ -169,14 +164,14 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
if (differentiatorName != null && !differentiatorName.isEmpty()) {
Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
if (differentiatorSupplier == null) {
throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
"correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
}
differentiator = differentiatorSupplier.get();
} else {
differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
differentiator.initialize(properties, configurationFileHolder);
}
@ -193,7 +188,7 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
this.configurationChangeNotifier = configurationChangeNotifier;
}
protected void setDifferentiator(Differentiator<InputStream> differentiator) {
protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
this.differentiator = differentiator;
}

View File

@ -17,33 +17,9 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import okhttp3.Credentials;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -58,9 +34,25 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.Credentials;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.slf4j.LoggerFactory;
public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
@ -109,7 +101,6 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
private volatile boolean overrideSecurity = false;
public PullHttpChangeIngestor() {
logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
@ -153,14 +144,6 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
"the default value of \"false\". It is set to \"" + useEtagString + "\".");
}
final String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)) {
overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties);
} else {
throw new IllegalArgumentException("Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" +
" or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\".");
}
httpClientReference.set(null);
final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
@ -234,8 +217,8 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
.build();
final Request.Builder requestBuilder = new Request.Builder()
.get()
.url(url);
.get()
.url(url);
if (useEtag) {
requestBuilder.addHeader("If-None-Match", lastEtag);
@ -264,29 +247,9 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
return;
}
final ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
ByteBuffer readOnlyNewConfig = null;
// checking if some parts of the configuration must be preserved
if (overrideSecurity) {
readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
} else {
logger.debug("Preserving previous security properties...");
// get the current security properties from the current configuration file
final File configFile = new File(properties.get().getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY));
ConvertableSchema<ConfigSchema> configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new FileInputStream(configFile));
ConfigSchema currentSchema = configSchema.convert();
SecurityPropertiesSchema secProps = currentSchema.getSecurityProperties();
// override the security properties in the pulled configuration with the previous properties
configSchema = SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(bodyByteBuffer.duplicate()));
ConfigSchema newSchema = configSchema.convert();
newSchema.setSecurityProperties(secProps);
// return the updated configuration preserving the previous security configuration
readOnlyNewConfig = ByteBuffer.wrap(new Yaml().dump(newSchema.toMap()).getBytes()).asReadOnlyBuffer();
}
ByteBuffer readOnlyNewConfig =
ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(body.bytes(), configurationFileHolder.getConfigFileReference().get().duplicate(), properties.get());
if (differentiator.isNew(readOnlyNewConfig)) {
logger.debug("New change received, notifying listener");

View File

@ -17,14 +17,29 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -35,32 +50,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
public class RestChangeIngestor implements ChangeIngestor {
private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
static {
HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@ -86,8 +83,10 @@ public class RestChangeIngestor implements ChangeIngestor {
public static final String DIFFERENTIATOR_KEY = RECEIVE_HTTP_BASE_KEY + ".differentiator";
private final Server jetty;
private volatile Differentiator<InputStream> differentiator;
private volatile Differentiator<ByteBuffer> differentiator;
private volatile ConfigurationChangeNotifier configurationChangeNotifier;
private volatile ConfigurationFileHolder configurationFileHolder;
private volatile Properties properties;
public RestChangeIngestor() {
QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
@ -97,19 +96,20 @@ public class RestChangeIngestor implements ChangeIngestor {
@Override
public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
this.configurationFileHolder = configurationFileHolder;
this.properties = properties;
logger.info("Initializing");
final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
if (differentiatorName != null && !differentiatorName.isEmpty()) {
Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
if (differentiatorSupplier == null) {
throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
"correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
}
differentiator = differentiatorSupplier.get();
} else {
differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
}
differentiator.initialize(properties, configurationFileHolder);
@ -207,7 +207,7 @@ public class RestChangeIngestor implements ChangeIngestor {
logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
}
protected void setDifferentiator(Differentiator<InputStream> differentiator) {
protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
this.differentiator = differentiator;
}
@ -215,7 +215,7 @@ public class RestChangeIngestor implements ChangeIngestor {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
throws IOException {
logRequest(request);
@ -224,17 +224,12 @@ public class RestChangeIngestor implements ChangeIngestor {
if (POST.equals(request.getMethod())) {
int statusCode;
String responseText;
try (ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
TeeInputStream teeInputStream = new TeeInputStream(request.getInputStream(), pipedOutputStream)) {
try {
ByteBuffer readOnlyNewConfig =
ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
IOUtils.toByteArray(request.getInputStream()), configurationFileHolder.getConfigFileReference().get().duplicate(), properties);
if (differentiator.isNew(teeInputStream)) {
// Fill the pipedOutputStream with the rest of the request data
while (teeInputStream.available() != 0) {
teeInputStream.read();
}
ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
if (differentiator.isNew(readOnlyNewConfig)) {
Collection<ListenerHandleResult> listenerHandleResults = configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
@ -250,9 +245,13 @@ public class RestChangeIngestor implements ChangeIngestor {
statusCode = 409;
responseText = "Request received but instance is already running this config.";
}
writeOutput(response, responseText, statusCode);
} catch (Exception e) {
logger.error("Failed to override config file", e);
statusCode = 500;
responseText = "Failed to override config file";
}
writeOutput(response, responseText, statusCode);
} else if (GET.equals(request.getMethod())) {
writeOutput(response, GET_TEXT, 200);
} else {

View File

@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.exception.InvalidCommandException;
import org.slf4j.Logger;
@ -48,7 +49,7 @@ public class BootstrapCodec {
public void communicate() throws IOException {
String line = reader.readLine();
String[] splits = line.split(" ");
String[] splits = Optional.ofNullable(line).map(l -> l.split(" ")).orElse(new String[0]);
if (splits.length == 0) {
throw new IOException("Received invalid command from MiNiFi: " + line);
}

View File

@ -21,15 +21,17 @@ import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.DEFAULT_LOGGER;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class CurrentPortProvider {
private final MiNiFiCommandSender miNiFiCommandSender;
private final MiNiFiParameters miNiFiParameters;
private final ProcessUtils processUtils;
public CurrentPortProvider(MiNiFiCommandSender miNiFiCommandSender, MiNiFiParameters miNiFiParameters) {
public CurrentPortProvider(MiNiFiCommandSender miNiFiCommandSender, MiNiFiParameters miNiFiParameters, ProcessUtils processUtils) {
this.miNiFiCommandSender = miNiFiCommandSender;
this.miNiFiParameters = miNiFiParameters;
this.processUtils = processUtils;
}
public Integer getCurrentPort() {
@ -50,7 +52,7 @@ public class CurrentPortProvider {
long minifiPid = miNiFiParameters.getMinifiPid();
DEFAULT_LOGGER.debug("Current PID {}", minifiPid);
boolean procRunning = UnixProcessUtils.isProcessRunning(minifiPid);
boolean procRunning = processUtils.isProcessRunning(minifiPid);
if (procRunning) {
return miNiFiPort;
} else {

View File

@ -24,10 +24,10 @@ import org.slf4j.LoggerFactory;
public class GracefulShutdownParameterProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownParameterProvider.class);
private static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
private static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
private static final String INVALID_GRACEFUL_SHUTDOWN_SECONDS_MESSAGE =
"The {} property in Bootstrap Config File has an invalid value. Must be a non-negative integer, Falling back to the default {} value";
protected static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
protected static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
private final BootstrapFileProvider bootstrapFileProvider;

View File

@ -19,15 +19,10 @@ package org.apache.nifi.minifi.bootstrap.service;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -38,11 +33,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.slf4j.Logger;
public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener {
@ -67,13 +57,10 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
throw new ConfigurationChangeException("Instance is already handling another change");
}
// Store the incoming stream as a byte array to be shared among components that need it
try(ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream()) {
try {
Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
IOUtils.copy(configInputStream, bufferedConfigOs);
File swapConfigFile = bootstrapFileProvider.getSwapFile();
logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
@ -81,11 +68,11 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
}
persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), bootstrapProperties, configFile);
// write out new config to file
Files.copy(configInputStream, configFile.toPath(), REPLACE_EXISTING);
// Create an input stream to feed to the config transformer
try (FileInputStream newConfigIs = new FileInputStream(configFile)) {
try {
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
transformConfigurationFiles(confDir, newConfigIs, configFile, swapConfigFile);
@ -146,61 +133,4 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
}
}
private void persistBackNonFlowSectionsFromOriginalSchema(byte[] newSchema, Properties bootstrapProperties, File configFile) {
try {
ConvertableSchema<ConfigSchema> schemaNew = ConfigTransformer
.throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteArrayInputStream(newSchema)));
ConfigSchema configSchemaNew = ConfigTransformer.throwIfInvalid(schemaNew.convert());
ConvertableSchema<ConfigSchema> schemaOld = ConfigTransformer
.throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(runner.getConfigFileReference().get().duplicate())));
ConfigSchema configSchemaOld = ConfigTransformer.throwIfInvalid(schemaOld.convert());
configSchemaNew.setNifiPropertiesOverrides(configSchemaOld.getNifiPropertiesOverrides());
if (!overrideCoreProperties(bootstrapProperties)) {
logger.debug("Preserving previous core properties...");
configSchemaNew.setCoreProperties(configSchemaOld.getCoreProperties());
}
if (!overrideSecurityProperties(bootstrapProperties)) {
logger.debug("Preserving previous security properties...");
configSchemaNew.setSecurityProperties(configSchemaOld.getSecurityProperties());
}
logger.debug("Persisting changes to {}", configFile.getAbsolutePath());
SchemaLoader.toYaml(configSchemaNew, new FileWriter(configFile));
} catch (Exception e) {
logger.error("Loading the old and the new schema for merging was not successful", e);
}
}
private static boolean overrideSecurityProperties(Properties properties) {
String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
boolean overrideSecurity;
if ("true".equalsIgnoreCase(overrideSecurityProperties) || "false".equalsIgnoreCase(overrideSecurityProperties)) {
overrideSecurity = Boolean.parseBoolean(overrideSecurityProperties);
} else {
throw new IllegalArgumentException(
"Property, " + OVERRIDE_SECURITY + ", to specify whether to override security properties must either be a value boolean value (\"true\" or \"false\")" +
" or left to the default value of \"false\". It is set to \"" + overrideSecurityProperties + "\".");
}
return overrideSecurity;
}
private static boolean overrideCoreProperties(Properties properties) {
String overrideCorePropertiesKey = PULL_HTTP_BASE_KEY + ".override.core";
String overrideCoreProps = (String) properties.getOrDefault(overrideCorePropertiesKey, "false");
boolean overrideCoreProperties;
if ("true".equalsIgnoreCase(overrideCoreProps) || "false".equalsIgnoreCase(overrideCoreProps)) {
overrideCoreProperties = Boolean.parseBoolean(overrideCoreProps);
} else {
throw new IllegalArgumentException(
"Property, " + overrideCorePropertiesKey + ", to specify whether to override core properties must either be a value boolean value (\"true\" or \"false\")" +
" or left to the default value of \"false\". It is set to \"" + overrideCoreProps + "\".");
}
return overrideCoreProperties;
}
}

View File

@ -17,16 +17,18 @@
package org.apache.nifi.minifi.bootstrap.service;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import static org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils.isProcessRunning;
import org.apache.nifi.minifi.bootstrap.MiNiFiStatus;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class MiNiFiStatusProvider {
private final MiNiFiCommandSender miNiFiCommandSender;
private final ProcessUtils processUtils;
public MiNiFiStatusProvider(MiNiFiCommandSender miNiFiCommandSender) {
public MiNiFiStatusProvider(MiNiFiCommandSender miNiFiCommandSender, ProcessUtils processUtils) {
this.miNiFiCommandSender = miNiFiCommandSender;
this.processUtils = processUtils;
}
public MiNiFiStatus getStatus(int port, long pid) {
@ -43,6 +45,6 @@ public class MiNiFiStatusProvider {
return new MiNiFiStatus(port, pid, true, true);
}
return new MiNiFiStatus(port, pid, false, isProcessRunning(pid));
return new MiNiFiStatus(port, pid, false, processUtils.isProcessRunning(pid));
}
}

View File

@ -25,7 +25,7 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
public class ReloadService {
private final BootstrapFileProvider bootstrapFileProvider;
@ -35,16 +35,18 @@ public class ReloadService {
private final CurrentPortProvider currentPortProvider;
private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
private final RunMiNiFi runMiNiFi;
private final ProcessUtils processUtils;
public ReloadService(BootstrapFileProvider bootstrapFileProvider, MiNiFiParameters miNiFiParameters,
MiNiFiCommandSender miNiFiCommandSender, CurrentPortProvider currentPortProvider,
GracefulShutdownParameterProvider gracefulShutdownParameterProvider, RunMiNiFi runMiNiFi) {
GracefulShutdownParameterProvider gracefulShutdownParameterProvider, RunMiNiFi runMiNiFi, ProcessUtils processUtils) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.miNiFiParameters = miNiFiParameters;
this.miNiFiCommandSender = miNiFiCommandSender;
this.currentPortProvider = currentPortProvider;
this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
this.runMiNiFi = runMiNiFi;
this.processUtils = processUtils;
}
public void reload() throws IOException {
@ -60,7 +62,7 @@ public class ReloadService {
if (commandResponse.filter(RELOAD_CMD::equals).isPresent()) {
DEFAULT_LOGGER.info("Apache MiNiFi has accepted the Reload Command and is reloading");
if (minifiPid != UNINITIALIZED) {
UnixProcessUtils.gracefulShutDownMiNiFiProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds as part of configuration reload. Killing process.",
processUtils.shutdownProcess(minifiPid, "MiNiFi has not finished shutting down after {} seconds as part of configuration reload. Killing process.",
gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
runMiNiFi.setReloading(true);
DEFAULT_LOGGER.info("MiNiFi has finished shutting down and will be reloaded.");
@ -73,7 +75,7 @@ public class ReloadService {
DEFAULT_LOGGER.error("No PID found for the MiNiFi process, so unable to kill process; The process should be killed manually.");
} else {
DEFAULT_LOGGER.error("Will kill the MiNiFi Process with PID {}", minifiPid);
UnixProcessUtils.killProcessTree(minifiPid);
processUtils.killProcessTree(minifiPid);
}
} finally {
if (reloadLockFile.exists() && !reloadLockFile.delete()) {

View File

@ -17,10 +17,15 @@
package org.apache.nifi.minifi.bootstrap.util;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@ -79,13 +84,13 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
public final class ConfigTransformer {
// Underlying version of NIFI will be using
public static final String ROOT_GROUP = "Root-Group";
private static final String OVERRIDE_CORE_PROPERTIES_KEY = PULL_HTTP_BASE_KEY + ".override.core";
private static final Base64.Encoder KEY_ENCODER = Base64.getEncoder().withoutPadding();
private static final int SENSITIVE_PROPERTIES_KEY_LENGTH = 24;
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTransformer.class);
public static final Logger logger = LoggerFactory.getLogger(ConfigTransformer.class);
// Underlying version of NIFI will be using
public static final String ROOT_GROUP = "Root-Group";
// Final util classes should have private constructor
private ConfigTransformer() {
@ -119,19 +124,19 @@ public final class ConfigTransformer {
// See if we are providing defined properties from the filesystem configurations and use those as the definitive values
if (securityProperties != null) {
configSchemaNew.setSecurityProperties(securityProperties);
logger.info("Bootstrap flow override: Replaced security properties");
LOGGER.info("Bootstrap flow override: Replaced security properties");
}
if (provenanceReportingProperties != null) {
configSchemaNew.setProvenanceReportingProperties(provenanceReportingProperties);
logger.info("Bootstrap flow override: Replaced provenance reporting properties");
LOGGER.info("Bootstrap flow override: Replaced provenance reporting properties");
}
// Replace all processor SSL controller services with MiNiFi parent, if bootstrap boolean is set to true
if (BootstrapTransformer.processorSSLOverride(bootstrapProperties)) {
for (ProcessorSchema processorConfig : configSchemaNew.getProcessGroupSchema().getProcessors()) {
processorConfig.getProperties().replace("SSL Context Service", processorConfig.getProperties().get("SSL Context Service"), "SSL-Context-Service");
logger.info("Bootstrap flow override: Replaced {} SSL Context Service with parent MiNiFi SSL", processorConfig.getName());
LOGGER.info("Bootstrap flow override: Replaced {} SSL Context Service with parent MiNiFi SSL", processorConfig.getName());
}
}
@ -286,7 +291,7 @@ public final class ConfigTransformer {
final String notnullSensitivePropertiesKey;
// Auto-generate the sensitive properties key if not provided, NiFi security libraries require it
if (StringUtil.isNullOrEmpty(sensitivePropertiesKey)) {
logger.warn("Generating Random Sensitive Properties Key [{}]", NiFiProperties.SENSITIVE_PROPS_KEY);
LOGGER.warn("Generating Random Sensitive Properties Key [{}]", NiFiProperties.SENSITIVE_PROPS_KEY);
final SecureRandom secureRandom = new SecureRandom();
final byte[] sensitivePropertiesKeyBinary = new byte[SENSITIVE_PROPERTIES_KEY_LENGTH];
secureRandom.nextBytes(sensitivePropertiesKeyBinary);
@ -750,6 +755,52 @@ public final class ConfigTransformer {
element.appendChild(toAdd);
}
public static ByteBuffer overrideNonFlowSectionsFromOriginalSchema(byte[] newSchema, ByteBuffer currentConfigScheme, Properties bootstrapProperties)
throws InvalidConfigurationException {
try {
boolean overrideCoreProperties = ConfigTransformer.overrideCoreProperties(bootstrapProperties);
boolean overrideSecurityProperties = ConfigTransformer.overrideSecurityProperties(bootstrapProperties);
if (overrideCoreProperties && overrideSecurityProperties) {
return ByteBuffer.wrap(newSchema);
} else {
ConvertableSchema<ConfigSchema> schemaNew = ConfigTransformer
.throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteArrayInputStream(newSchema)));
ConfigSchema configSchemaNew = ConfigTransformer.throwIfInvalid(schemaNew.convert());
ConvertableSchema<ConfigSchema> schemaOld = ConfigTransformer
.throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(currentConfigScheme)));
ConfigSchema configSchemaOld = ConfigTransformer.throwIfInvalid(schemaOld.convert());
configSchemaNew.setNifiPropertiesOverrides(configSchemaOld.getNifiPropertiesOverrides());
if (!overrideCoreProperties) {
LOGGER.debug("Preserving previous core properties...");
configSchemaNew.setCoreProperties(configSchemaOld.getCoreProperties());
}
if (!overrideSecurityProperties) {
LOGGER.debug("Preserving previous security properties...");
configSchemaNew.setSecurityProperties(configSchemaOld.getSecurityProperties());
}
StringWriter writer = new StringWriter();
SchemaLoader.toYaml(configSchemaNew, writer);
return ByteBuffer.wrap(writer.toString().getBytes()).asReadOnlyBuffer();
}
} catch (Exception e) {
throw new InvalidConfigurationException("Loading the old and the new schema for merging was not successful", e);
}
}
private static boolean overrideSecurityProperties(Properties properties) {
String overrideSecurityProperties = (String) properties.getOrDefault(OVERRIDE_SECURITY, "false");
return Boolean.parseBoolean(overrideSecurityProperties);
}
private static boolean overrideCoreProperties(Properties properties) {
String overrideCoreProps = (String) properties.getOrDefault(OVERRIDE_CORE_PROPERTIES_KEY, "false");
return Boolean.parseBoolean(overrideCoreProps);
}
public static final String PROPERTIES_FILE_APACHE_2_0_LICENSE =
" Licensed to the Apache Software Foundation (ASF) under one or more\n" +
"# contributor license agreements. See the NOTICE file distributed with\n" +

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.util;
import java.io.IOException;
public interface ProcessUtils {
boolean isProcessRunning(Long pid);
void shutdownProcess(Long pid, String s, int gracefulShutdownSeconds);
void killProcessTree(Long pid) throws IOException;
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.minifi.bootstrap.util;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.DEFAULT_LOGGER;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -32,10 +34,11 @@ import org.slf4j.LoggerFactory;
* Utility class for providing information about the running MiNiFi process.
* The methods which are using the PID are working only on unix systems, and should be used only as a fallback in case the PING command fails.
* */
public class UnixProcessUtils {
public class UnixProcessUtils implements ProcessUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(UnixProcessUtils.class);
public static boolean isProcessRunning(Long pid) {
@Override
public boolean isProcessRunning(Long pid) {
if (pid == null) {
LOGGER.error("Unable to get process status due to missing process id");
return false;
@ -72,17 +75,18 @@ public class UnixProcessUtils {
}
}
public static void gracefulShutDownMiNiFiProcess(Long pid, String s, int gracefulShutdownSeconds) {
@Override
public void shutdownProcess(Long pid, String s, int gracefulShutdownSeconds) {
long startWait = System.nanoTime();
while (UnixProcessUtils.isProcessRunning(pid)) {
while (isProcessRunning(pid)) {
LOGGER.info("Waiting for Apache MiNiFi to finish shutting down...");
long waitNanos = System.nanoTime() - startWait;
long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds >= gracefulShutdownSeconds || gracefulShutdownSeconds == 0) {
if (UnixProcessUtils.isProcessRunning(pid)) {
if (isProcessRunning(pid)) {
LOGGER.warn(s, gracefulShutdownSeconds);
try {
UnixProcessUtils.killProcessTree(pid);
killProcessTree(pid);
} catch (IOException ioe) {
LOGGER.error("Failed to kill Process with PID {}", pid);
}
@ -92,12 +96,14 @@ public class UnixProcessUtils {
try {
Thread.sleep(2000L);
} catch (InterruptedException ie) {
DEFAULT_LOGGER.warn("Thread interrupted while shutting down MiNiFi");
}
}
}
}
public static void killProcessTree(Long pid) throws IOException {
@Override
public void killProcessTree(Long pid) throws IOException {
LOGGER.debug("Killing Process Tree for PID {}", pid);
List<Long> children = getChildProcesses(pid);
@ -110,22 +116,7 @@ public class UnixProcessUtils {
Runtime.getRuntime().exec(new String[]{"kill", "-9", String.valueOf(pid)});
}
/**
* Checks the status of the given process.
*
* @param process the process object what we want to check
* @return true if the process is Alive
*/
public static boolean isAlive(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalStateException | IllegalThreadStateException itse) {
return true;
}
}
private static List<Long> getChildProcesses(Long ppid) throws IOException {
private List<Long> getChildProcesses(Long ppid) throws IOException {
Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", String.valueOf(ppid)});
List<Long> childPids = new ArrayList<>();
try (InputStream in = proc.getInputStream();

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.STOP;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class ShutdownHookTest {
@Mock
private RunMiNiFi runner;
@Mock
private MiNiFiStdLogHandler miNiFiStdLogHandler;
@Mock
private PeriodicStatusReporterManager periodicStatusReporterManager;
@InjectMocks
private ShutdownHook shutdownHook;
@Test
void testRunShouldShutdownSchedulersAndProcesses() {
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
shutdownHook.run();
verify(miNiFiStdLogHandler).shutdown();
verify(runner).shutdownChangeNotifier();
verify(runner).getPeriodicStatusReporterManager();
verify(periodicStatusReporterManager).shutdownPeriodicStatusReporters();
verify(runner).setAutoRestartNiFi(false);
verify(runner).run(STOP);
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.DUMP;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.ENV;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.FLOWSTATUS;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.RESTART;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.RUN;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.START;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.STATUS;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.STOP;
import static org.apache.nifi.minifi.bootstrap.BootstrapCommand.UNKNOWN;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verifyNoInteractions;
import java.io.File;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class CommandRunnerFactoryTest {
@Mock
private MiNiFiCommandSender miNiFiCommandSender;
@Mock
private CurrentPortProvider currentPortProvider;
@Mock
private MiNiFiParameters miNiFiParameters;
@Mock
private MiNiFiStatusProvider miNiFiStatusProvider;
@Mock
private PeriodicStatusReporterManager periodicStatusReporterManager;
@Mock
private BootstrapFileProvider bootstrapFileProvider;
@Mock
private MiNiFiStdLogHandler miNiFiStdLogHandler;
@Mock
private File bootstrapConfigFile;
@Mock
private RunMiNiFi runMiNiFi;
@Mock
private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
@Mock
private MiNiFiExecCommandProvider miNiFiExecCommandProvider;
@InjectMocks
private CommandRunnerFactory commandRunnerFactory;
@Test
void testRunCommandShouldStartCommandReturnStartRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(START);
assertInstanceOf(StartRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldRunCommandReturnStartRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(RUN);
assertInstanceOf(StartRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldStopCommandReturnStopRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(STOP);
assertInstanceOf(StopRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldEnvCommandReturnEnvRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(ENV);
assertInstanceOf(EnvRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldDumpCommandReturnDumpRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(DUMP);
assertInstanceOf(DumpRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldFlowStatusCommandReturnFlowStatusRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(FLOWSTATUS);
assertInstanceOf(FlowStatusRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldStatusCommandReturnStatusRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(STATUS);
assertInstanceOf(StatusRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldRestartCommandReturnCompositeRunner() {
CommandRunner runner = commandRunnerFactory.getRunner(RESTART);
assertInstanceOf(CompositeCommandRunner.class, runner);
verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
}
@Test
void testRunCommandShouldThrowIllegalArgumentExceptionInCaseOfUnknownCommand() {
assertThrows(IllegalArgumentException.class, () -> commandRunnerFactory.getRunner(UNKNOWN));
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class CompositeCommandRunnerTest {
@Mock
private CommandRunner startRunner;
@Mock
private CommandRunner stopRunner;
private CompositeCommandRunner compositeCommandRunner;
@BeforeEach
void setup() {
compositeCommandRunner = new CompositeCommandRunner(Arrays.asList(startRunner, stopRunner));
}
@Test
void testRunCommandShouldExecuteCommandsTillFirstNonSuccessStatusCode() {
when(startRunner.runCommand(any())).thenReturn(ERROR.getStatusCode());
int statusCode = compositeCommandRunner.runCommand(new String[0]);
assertEquals(ERROR.getStatusCode(), statusCode);
verify(startRunner).runCommand(any());
verifyNoInteractions(stopRunner);
}
@Test
void testRunCommandShouldExecuteCommandsAndReturnOKWhenThereWasNoError() {
when(startRunner.runCommand(any())).thenReturn(OK.getStatusCode());
when(stopRunner.runCommand(any())).thenReturn(OK.getStatusCode());
int statusCode = compositeCommandRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verify(startRunner).runCommand(any());
verify(stopRunner).runCommand(any());
verifyNoMoreInteractions(startRunner, stopRunner);
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.apache.nifi.minifi.bootstrap.command.DumpRunner.DUMP_CMD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class DumpRunnerTest {
private static final int MINIFI_PORT = 1337;
private static final String DUMP_CONTENT = "dump_content";
@Mock
private MiNiFiCommandSender miNiFiCommandSender;
@Mock
private CurrentPortProvider currentPortProvider;
@InjectMocks
private DumpRunner dumpRunner;
@Test
void testRunCommandShouldDumpToConsoleIfNoFileDefined() throws IOException {
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenReturn(Optional.of(DUMP_CONTENT));
int statusCode = dumpRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
}
@Test
void testRunCommandShouldDumpToFileIfItIsDefined() throws IOException {
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenReturn(Optional.of(DUMP_CONTENT));
File file = Files.createTempFile(null, null).toFile();
file.deleteOnExit();
String tmpFilePath = file.getAbsolutePath();
int statusCode = dumpRunner.runCommand(new String[] {DUMP_CMD, tmpFilePath});
assertEquals(OK.getStatusCode(), statusCode);
assertEquals(DUMP_CONTENT, getDumpContent(file));
verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
}
@Test
void testRunCommandShouldReturnNotRunningStatusCodeIfPortReturnsNull() {
when(currentPortProvider.getCurrentPort()).thenReturn(null);
int statusCode = dumpRunner.runCommand(new String[]{});
assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider);
verifyNoInteractions(miNiFiCommandSender);
}
@Test
void testRunCommandShouldReturnErrorStatusCodeIfSendCommandThrowsException() throws IOException {
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenThrow(new IOException());
int statusCode = dumpRunner.runCommand(new String[]{});
assertEquals(ERROR.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
}
@Test
void testRunCommandShouldReturnErrorStatusCodeIfFileWriteFailureHappens() throws IOException {
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(miNiFiCommandSender.sendCommand(DUMP_CMD, MINIFI_PORT)).thenReturn(Optional.ofNullable(DUMP_CONTENT));
File file = Files.createTempFile(null, null).toFile();
file.deleteOnExit();
file.setReadOnly();
String tmpFilePath = file.getAbsolutePath();
int statusCode = dumpRunner.runCommand(new String[] {DUMP_CMD, tmpFilePath});
assertEquals(ERROR.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
}
private String getDumpContent(File dumpFile) {
String fileLines = null;
if (dumpFile.exists()) {
try {
fileLines = new String(Files.readAllBytes(dumpFile.toPath()));
} catch (IOException e) {
fileLines = null;
}
}
return fileLines;
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.apache.nifi.minifi.bootstrap.command.EnvRunner.ENV_CMD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class EnvRunnerTest {
private static final int MINIFI_PORT = 1337;
private static final String ENV_DATA = "ENV_DATA";
@Mock
private MiNiFiCommandSender miNiFiCommandSender;
@Mock
private CurrentPortProvider currentPortProvider;
@InjectMocks
private EnvRunner envRunner;
@Test
void testRunCommandShouldReturnNotRunningStatusCodeIfPortReturnsNull() {
when(currentPortProvider.getCurrentPort()).thenReturn(null);
int statusCode = envRunner.runCommand(new String[]{});
assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider);
verifyNoInteractions(miNiFiCommandSender);
}
@Test
void testRunCommandShouldReturnErrorStatusCodeIfSendCommandThrowsException() throws IOException {
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(miNiFiCommandSender.sendCommand(ENV_CMD, MINIFI_PORT)).thenThrow(new IOException());
int statusCode = envRunner.runCommand(new String[]{});
assertEquals(ERROR.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
}
@Test
void testRunCommandShouldReturnOkStatusCode() throws IOException {
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(miNiFiCommandSender.sendCommand(ENV_CMD, MINIFI_PORT)).thenReturn(Optional.of(ENV_DATA));
int statusCode = envRunner.runCommand(new String[]{});
assertEquals(OK.getStatusCode(), statusCode);
verifyNoMoreInteractions(currentPortProvider, miNiFiCommandSender);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import java.util.Collections;
import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
import org.apache.nifi.minifi.commons.status.FlowStatusReport;
import org.apache.nifi.minifi.commons.status.instance.InstanceHealth;
import org.apache.nifi.minifi.commons.status.instance.InstanceStatus;
import org.apache.nifi.minifi.commons.status.processor.ProcessorStatusBean;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class FlowStatusRunnerTest {
protected static final String STATUS_REQUEST = "processor:TailFile:health,stats,bulletins";
@Mock
private PeriodicStatusReporterManager periodicStatusReporterManager;
@InjectMocks
private FlowStatusRunner flowStatusRunner;
@Test
void testRunCommandShouldReturnErrorCodeWhenArgsLengthIsNotTwo() {
int statusCode = flowStatusRunner.runCommand(new String[0]);
assertEquals(ERROR.getStatusCode(), statusCode);
verifyNoInteractions(periodicStatusReporterManager);
}
@Test
void testRunCommandShouldReturnOkCodeWhenArgsLengthIsTwo() {
FlowStatusReport flowStatusReport = aFlowStatusReport();
when(periodicStatusReporterManager.statusReport(STATUS_REQUEST)).thenReturn(flowStatusReport);
int statusCode = flowStatusRunner.runCommand(new String[] {"flowStatus", STATUS_REQUEST});
assertEquals(OK.getStatusCode(), statusCode);
}
private FlowStatusReport aFlowStatusReport() {
FlowStatusReport flowStatusReport = new FlowStatusReport();
InstanceStatus instanceStatus = new InstanceStatus();
InstanceHealth instanceHealth = new InstanceHealth();
instanceHealth.setQueuedCount(2);
instanceHealth.setActiveThreads(3);
instanceStatus.setInstanceHealth(instanceHealth);
flowStatusReport.setInstanceStatus(instanceStatus);
ProcessorStatusBean processorStatusBean = new ProcessorStatusBean();
processorStatusBean.setId("processorId");
flowStatusReport.setProcessorStatusList(Collections.singletonList(processorStatusBean));
return flowStatusReport;
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RESPONDING;
import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.MiNiFiStatus;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiStatusProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class StatusRunnerTest {
private static final long MINIFI_PID = 1L;
private static final int MINIFI_PORT = 1337;
@Mock
private MiNiFiParameters miNiFiParameters;
@Mock
private MiNiFiStatusProvider miNiFiStatusProvider;
@InjectMocks
private StatusRunner statusRunner;
@BeforeEach
void setup() {
when(miNiFiParameters.getMinifiPid()).thenReturn(MINIFI_PID);
when(miNiFiParameters.getMiNiFiPort()).thenReturn(MINIFI_PORT);
}
@Test
void testRunCommandShouldReturnOkStatusIfMiNiFiIsRespondingToPing() {
when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, MINIFI_PID, true, false));
int status = statusRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), status);
}
@Test
void testRunCommandShouldReturnMiNiFiNotRespondingStatusIfMiNiFiIsNotRespondingToPing() {
when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, MINIFI_PID, false, true));
int status = statusRunner.runCommand(new String[0]);
assertEquals(MINIFI_NOT_RESPONDING.getStatusCode(), status);
}
@Test
void testRunCommandShouldReturnMiNiFiNotRunningStatusIfMiNiFiIsNotRespondingToPingAndProcessIsNotRunning() {
when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(null, MINIFI_PID, false, false));
int status = statusRunner.runCommand(new String[0]);
assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), status);
}
@Test
void testRunCommandShouldReturnMiNiFiNotRunningStatusIfMiNiFiIsNotRespondingToPingIfPortIsGivenButPidIsMissingAndNotRespondingToPing() {
when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, null, false, false));
int status = statusRunner.runCommand(new String[0]);
assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), status);
}
@Test
void testRunCommandShouldReturnMiNiFiNotRunningStatusIfPortAndPidIsGivenButNotRespondingToPingAndProcessIsNotRunning() {
when(miNiFiStatusProvider.getStatus(MINIFI_PORT, MINIFI_PID)).thenReturn(new MiNiFiStatus(MINIFI_PORT, MINIFI_PID, false, false));
int status = statusRunner.runCommand(new String[0]);
assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), status);
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.command;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.MINIFI_NOT_RUNNING;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.apache.nifi.minifi.bootstrap.command.StopRunner.SHUTDOWN_CMD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.Optional;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
import org.apache.nifi.minifi.bootstrap.service.MiNiFiCommandSender;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class StopRunnerTest {
private static final int MINIFI_PORT = 1337;
private static final long MINIFI_PID = 1;
@Mock
private BootstrapFileProvider bootstrapFileProvider;
@Mock
private MiNiFiParameters miNiFiParameters;
@Mock
private MiNiFiCommandSender miNiFiCommandSender;
@Mock
private CurrentPortProvider currentPortProvider;
@Mock
private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
@Mock
private ProcessUtils processUtils;
@InjectMocks
private StopRunner stopRunner;
@Test
void testRunCommandShouldReturnErrorStatusCodeInCaseOfException() {
when(currentPortProvider.getCurrentPort()).thenThrow(new RuntimeException());
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(ERROR.getStatusCode(), statusCode);
verifyNoInteractions(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, gracefulShutdownParameterProvider, processUtils);
}
@Test
void testRunCommandShouldReturnMiNiFiNotRunningStatusCodeInCaseMiNiFiPortIsNull() {
when(currentPortProvider.getCurrentPort()).thenReturn(null);
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(MINIFI_NOT_RUNNING.getStatusCode(), statusCode);
verifyNoInteractions(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, gracefulShutdownParameterProvider, processUtils);
}
@Test
void testRunCommandShouldCreateAndCleanupLockFileAfterExecution() throws IOException {
File lockFile = mock(File.class);
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
when(lockFile.exists()).thenReturn(false, true);
when(lockFile.delete()).thenReturn(true);
when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(SHUTDOWN_CMD));
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verify(lockFile).createNewFile();
verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
}
@Test
void testRunCommandShouldMessageBeLockedInCaseOfLockFileFailureIssue() throws IOException {
File lockFile = mock(File.class);
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
when(lockFile.exists()).thenReturn(false, true);
when(lockFile.delete()).thenReturn(false);
when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(SHUTDOWN_CMD));
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verify(lockFile).createNewFile();
verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
}
@Test
void testRunCommandShouldReturnErrorStatusCodeIfMiNiFiResponseIsNotShutdown() throws IOException {
File lockFile = mock(File.class);
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
when(lockFile.exists()).thenReturn(true, false);
when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
String unknown = "unknown";
when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(unknown));
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(ERROR.getStatusCode(), statusCode);
verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
}
@Test
void testRunCommandShouldHandleExceptionalCaseIfProcessIdIsUnknown() throws IOException {
File lockFile = mock(File.class);
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
when(lockFile.exists()).thenReturn(true, false);
when(miNiFiParameters.getMinifiPid()).thenReturn((long) UNINITIALIZED);
when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenThrow(new IOException());
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verifyNoInteractions(gracefulShutdownParameterProvider, processUtils);
}
@Test
void testRunCommandShouldHandleExceptionalCaseIfProcessIdIsGiven() throws IOException {
File lockFile = mock(File.class);
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
when(lockFile.exists()).thenReturn(true, false);
when(miNiFiParameters.getMinifiPid()).thenReturn(MINIFI_PID);
when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenThrow(new IOException());
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verify(processUtils).killProcessTree(MINIFI_PID);
verifyNoInteractions(gracefulShutdownParameterProvider);
}
@Test
void testRunCommandShouldShutDownMiNiFiProcessGracefully() throws IOException {
File lockFile = mock(File.class);
File statusFile = mock(File.class);
File pidFile = mock(File.class);
int gracefulShutdownSeconds = 10;
when(currentPortProvider.getCurrentPort()).thenReturn(MINIFI_PORT);
when(bootstrapFileProvider.getLockFile()).thenReturn(lockFile);
when(bootstrapFileProvider.getStatusFile()).thenReturn(statusFile);
when(bootstrapFileProvider.getPidFile()).thenReturn(pidFile);
when(lockFile.exists()).thenReturn(true, false);
when(statusFile.exists()).thenReturn(true);
when(pidFile.exists()).thenReturn(true);
when(statusFile.delete()).thenReturn(true);
when(pidFile.delete()).thenReturn(true);
when(miNiFiParameters.getMinifiPid()).thenReturn(MINIFI_PID);
when(miNiFiCommandSender.sendCommand(SHUTDOWN_CMD, MINIFI_PORT)).thenReturn(Optional.of(SHUTDOWN_CMD));
when(gracefulShutdownParameterProvider.getGracefulShutdownSeconds()).thenReturn(gracefulShutdownSeconds);
int statusCode = stopRunner.runCommand(new String[0]);
assertEquals(OK.getStatusCode(), statusCode);
verify(statusFile).delete();
verify(pidFile).delete();
verify(processUtils).shutdownProcess(eq(MINIFI_PID), anyString(), eq(gracefulShutdownSeconds));
}
}

View File

@ -17,11 +17,11 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -30,9 +30,10 @@ import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -46,7 +47,7 @@ public class FileChangeIngestorTest {
private FileChangeIngestor notifierSpy;
private WatchService mockWatchService;
private Properties testProperties;
private Differentiator<InputStream> mockDifferentiator;
private Differentiator<ByteBuffer> mockDifferentiator;
private ConfigurationChangeNotifier testNotifier;
@BeforeEach
@ -56,13 +57,12 @@ public class FileChangeIngestorTest {
mockDifferentiator = Mockito.mock(Differentiator.class);
testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
notifierSpy.setWatchService(mockWatchService);
notifierSpy.setDifferentiator(mockDifferentiator);
notifierSpy.setConfigurationChangeNotifier(testNotifier);
setMocks();
testProperties = new Properties();
testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
testProperties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
testProperties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, FileChangeIngestor.DEFAULT_POLLING_PERIOD_INTERVAL);
}
@ -97,7 +97,7 @@ public class FileChangeIngestorTest {
/* Verify handleChange events */
@Test
public void testTargetChangedNoModification() throws Exception {
when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
// In this case the WatchKey is null because there were no events found
@ -108,7 +108,7 @@ public class FileChangeIngestorTest {
@Test
public void testTargetChangedWithModificationEventNonConfigFile() throws Exception {
when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
// In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
@ -123,12 +123,17 @@ public class FileChangeIngestorTest {
@Test
public void testTargetChangedWithModificationEvent() throws Exception {
when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
// Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
establishMockEnvironmentForChangeTests(mockWatchKey);
ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(new byte[0])));
notifierSpy.initialize(testProperties, configurationFileHolder, testNotifier);
setMocks();
// Invoke the method of interest
notifierSpy.run();
@ -160,4 +165,11 @@ public class FileChangeIngestorTest {
when(mockWatchService.poll()).thenReturn(watchKey);
}
private void setMocks() {
notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
notifierSpy.setWatchService(mockWatchService);
notifierSpy.setDifferentiator(mockDifferentiator);
notifierSpy.setConfigurationChangeNotifier(testNotifier);
}
}

View File

@ -17,15 +17,24 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.PullHttpChangeIngestorCommonTest;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.BeforeAll;
import org.mockito.Mockito;
import java.util.Properties;
public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonTest {
@BeforeAll
@ -65,7 +74,7 @@ public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonT
}
@Override
public void pullHttpChangeIngestorInit(Properties properties) {
public void pullHttpChangeIngestorInit(Properties properties) throws IOException, SchemaLoaderException {
properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
@ -76,10 +85,18 @@ public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonT
properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
ConfigSchema configSchema =
SchemaLoader.loadConfigSchemaFromYaml(PullHttpChangeIngestorSSLTest.class.getClassLoader().getResourceAsStream("config.yml"));
StringWriter writer = new StringWriter();
SchemaLoader.toYaml(configSchema, writer);
when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(writer.toString().getBytes())));
pullHttpChangeIngestor = new PullHttpChangeIngestor();
pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
pullHttpChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
}
}

View File

@ -17,14 +17,23 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.PullHttpChangeIngestorCommonTest;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
import org.eclipse.jetty.server.ServerConnector;
import org.junit.jupiter.api.BeforeAll;
import org.mockito.Mockito;
import java.util.Properties;
public class PullHttpChangeIngestorTest extends PullHttpChangeIngestorCommonTest {
@BeforeAll
@ -50,14 +59,22 @@ public class PullHttpChangeIngestorTest extends PullHttpChangeIngestorCommonTest
@Override
public void pullHttpChangeIngestorInit(Properties properties) {
public void pullHttpChangeIngestorInit(Properties properties) throws IOException, SchemaLoaderException {
port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000");
properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
ConfigSchema configSchema =
SchemaLoader.loadConfigSchemaFromYaml(PullHttpChangeIngestorTest.class.getClassLoader().getResourceAsStream("config.yml"));
StringWriter writer = new StringWriter();
SchemaLoader.toYaml(configSchema, writer);
when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(writer.toString().getBytes())));
pullHttpChangeIngestor = new PullHttpChangeIngestor();
pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
pullHttpChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
}
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.OkHttpClient;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
@ -44,6 +46,7 @@ import java.security.cert.CertificateException;
import java.util.Collections;
import java.util.Properties;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.mockito.Mockito.when;
public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
@ -58,6 +61,8 @@ public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
properties.setProperty(RestChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
properties.setProperty(RestChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
properties.setProperty(RestChangeIngestor.NEED_CLIENT_AUTH_KEY, "false");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
restChangeIngestor = new RestChangeIngestor();
@ -66,7 +71,10 @@ public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
when(testListener.getDescriptor()).thenReturn("MockChangeListener");
when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(new byte[0])));
restChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
restChangeIngestor.setDifferentiator(mockDifferentiator);
restChangeIngestor.start();

View File

@ -17,6 +17,11 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.OkHttpClient;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
@ -33,11 +38,16 @@ public class RestChangeIngestorTest extends RestChangeIngestorCommonTest {
@BeforeAll
public static void setUp() throws InterruptedException, MalformedURLException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PULL_HTTP_BASE_KEY + ".override.core", "true");
restChangeIngestor = new RestChangeIngestor();
testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
ConfigurationFileHolder configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(ByteBuffer.wrap(new byte[0])));
restChangeIngestor.initialize(properties, configurationFileHolder, testNotifier);
restChangeIngestor.setDifferentiator(mockDifferentiator);
restChangeIngestor.start();

View File

@ -17,11 +17,26 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Properties;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
@ -40,23 +55,6 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Properties;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public abstract class PullHttpChangeIngestorCommonTest {
public static volatile Server jetty;
@ -81,7 +79,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
jetty.setHandler(handlerCollection);
}
public abstract void pullHttpChangeIngestorInit(Properties properties);
public abstract void pullHttpChangeIngestorInit(Properties properties) throws IOException, SchemaLoaderException;
@BeforeEach
public void setListeners() {
@ -97,7 +95,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
public void testNewUpdate() throws IOException {
public void testNewUpdate() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
@ -131,7 +129,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
public void testNoUpdate() throws IOException {
public void testNoUpdate() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
@ -144,7 +142,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
public void testUseEtag() throws IOException {
public void testUseEtag() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
pullHttpChangeIngestorInit(properties);
@ -165,7 +163,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
public void testNewUpdateWithPath() throws IOException {
public void testNewUpdateWithPath() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PATH_KEY, "/config.yml");
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
@ -179,7 +177,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
public void testNoUpdateWithPath() throws IOException {
public void testNoUpdateWithPath() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PATH_KEY, "/config.yml");
@ -193,7 +191,7 @@ public abstract class PullHttpChangeIngestorCommonTest {
}
@Test
public void testUseEtagWithPath() throws IOException {
public void testUseEtagWithPath() throws IOException, SchemaLoaderException {
Properties properties = new Properties();
properties.put(PullHttpChangeIngestor.OVERRIDE_SECURITY, "true");
properties.put(PATH_KEY, "/config.yml");

View File

@ -17,6 +17,13 @@
package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@ -32,15 +39,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public abstract class RestChangeIngestorCommonTest {
private static final String testString = "This is a test string.";
@ -50,7 +48,7 @@ public abstract class RestChangeIngestorCommonTest {
public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
public static String url;
public static ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
public static Differentiator<InputStream> mockDifferentiator = Mockito.mock(Differentiator.class);
public static Differentiator<ByteBuffer> mockDifferentiator = Mockito.mock(Differentiator.class);
@BeforeEach
public void setListener() {
@ -81,7 +79,7 @@ public abstract class RestChangeIngestorCommonTest {
@Test
public void testFileUploadNewConfig() throws Exception {
when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
Request request = new Request.Builder()
.url(url)
@ -105,7 +103,7 @@ public abstract class RestChangeIngestorCommonTest {
@Test
public void testFileUploadSameConfig() throws Exception {
when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
Request request = new Request.Builder()
.url(url)

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.service;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
class BootstrapCodecTest {
private static final int VALID_PORT = 1;
private static final String SECRET = "secret";
private static final String OK = "OK";
private static final String EMPTY_STRING = "";
private RunMiNiFi runner;
@BeforeEach
void setup() {
runner = mock(RunMiNiFi.class);
}
@Test
void testCommunicateShouldThrowIOExceptionIfThereIsNoCommand() {
InputStream inputStream = new ByteArrayInputStream(new byte[0]);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
assertThrows(IOException.class, bootstrapCodec::communicate);
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@Test
void testCommunicateShouldInvalidCommandThrowIoException() {
String unknown = "unknown";
InputStream inputStream = new ByteArrayInputStream(unknown.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
assertThrows(IOException.class, bootstrapCodec::communicate);
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@Test
void testCommunicateShouldSetMiNiFiParametersAndWriteOk() throws IOException {
String command = "PORT " + VALID_PORT + " " + SECRET;
InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
bootstrapCodec.communicate();
verify(runner).setMiNiFiParameters(VALID_PORT, SECRET);
assertEquals(OK, outputStream.toString().trim());
}
@ParameterizedTest(name = "{index} => command={0}, expectedExceptionMessage={1}")
@MethodSource("portCommandValidationInputs")
void testCommunicateShouldFailWhenReceivesPortCommand(String command) {
InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
assertThrows(IOException.class, bootstrapCodec::communicate);
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
private static Stream<Arguments> portCommandValidationInputs() {
return Stream.of(
Arguments.of("PORT"),
Arguments.of("PORT invalid secretKey"),
Arguments.of("PORT 0 secretKey")
);
}
@Test
void testCommunicateShouldFailIfStartedCommandHasOtherThanOneArg() {
InputStream inputStream = new ByteArrayInputStream("STARTED".getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
assertThrows(IOException.class, bootstrapCodec::communicate);
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@Test
void testCommunicateShouldFailIfStartedCommandFirstArgIsNotBoolean() {
InputStream inputStream = new ByteArrayInputStream("STARTED yes".getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
assertThrows(IOException.class, bootstrapCodec::communicate);
assertEquals(EMPTY_STRING, outputStream.toString().trim());
verifyNoInteractions(runner);
}
@Test
void testCommunicateShouldHandleStartedCommand() throws IOException {
InputStream inputStream = new ByteArrayInputStream("STARTED true".getBytes(StandardCharsets.UTF_8));
PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
ConfigurationChangeCoordinator configurationChangeCoordinator = mock(ConfigurationChangeCoordinator.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
when(runner.getConfigurationChangeCoordinator()).thenReturn(configurationChangeCoordinator);
bootstrapCodec.communicate();
assertEquals(OK, outputStream.toString().trim());
verify(runner, times(2)).getPeriodicStatusReporterManager();
verify(periodicStatusReporterManager).shutdownPeriodicStatusReporters();
verify(periodicStatusReporterManager).startPeriodicNotifiers();
verify(runner).getConfigurationChangeCoordinator();
verify(configurationChangeCoordinator).start();
verify(runner).setNiFiStarted(true);
}
@Test
void testCommunicateShouldHandleShutdownCommand() throws IOException {
InputStream inputStream = new ByteArrayInputStream("SHUTDOWN".getBytes(StandardCharsets.UTF_8));
PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
bootstrapCodec.communicate();
assertEquals(OK, outputStream.toString().trim());
verify(runner).getPeriodicStatusReporterManager();
verify(runner).shutdownChangeNotifier();
verify(periodicStatusReporterManager).shutdownPeriodicStatusReporters();
}
@Test
void testCommunicateShouldHandleReloadCommand() throws IOException {
InputStream inputStream = new ByteArrayInputStream("RELOAD".getBytes(StandardCharsets.UTF_8));
PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
bootstrapCodec.communicate();
assertEquals(OK, outputStream.toString().trim());
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.service;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class CurrentPortProviderTest {
private static final int PORT = 1;
private static final long PID = 1L;
@Mock
private MiNiFiCommandSender miNiFiCommandSender;
@Mock
private MiNiFiParameters miNiFiParameters;
@Mock
private ProcessUtils processUtils;
@InjectMocks
private CurrentPortProvider currentPortProvider;
@Test
void testGetCurrentPortShouldReturnNullIfMiNiFiPortIsNotSet() {
when(miNiFiParameters.getMiNiFiPort()).thenReturn(UNINITIALIZED);
assertNull(currentPortProvider.getCurrentPort());
verifyNoInteractions(miNiFiCommandSender, processUtils);
}
@Test
void testGetCurrentPortShouldReturnNullIfPingIsNotSuccessfulAndProcessIsNotRunning() {
when(miNiFiParameters.getMiNiFiPort()).thenReturn(PORT);
when(miNiFiCommandSender.isPingSuccessful(PORT)).thenReturn(false);
when(miNiFiParameters.getMinifiPid()).thenReturn(PID);
when(processUtils.isProcessRunning(PID)).thenReturn(false);
assertNull(currentPortProvider.getCurrentPort());
}
@Test
void testGetCurrentPortShouldReturnPortIfPingIsSuccessful() {
when(miNiFiParameters.getMiNiFiPort()).thenReturn(PORT);
when(miNiFiCommandSender.isPingSuccessful(PORT)).thenReturn(true);
Integer currentPort = currentPortProvider.getCurrentPort();
assertEquals(PORT, currentPort);
verifyNoInteractions(processUtils);
verifyNoMoreInteractions(miNiFiParameters);
}
@Test
void testGetCurrentPortShouldReturnPortIfPingIsNotSuccessfulButPidIsRunning() {
when(miNiFiParameters.getMiNiFiPort()).thenReturn(PORT);
when(miNiFiCommandSender.isPingSuccessful(PORT)).thenReturn(false);
when(miNiFiParameters.getMinifiPid()).thenReturn(PID);
when(processUtils.isProcessRunning(PID)).thenReturn(true);
Integer currentPort = currentPortProvider.getCurrentPort();
assertEquals(PORT, currentPort);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.bootstrap.service;
import static org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider.DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
import static org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider.GRACEFUL_SHUTDOWN_PROP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Properties;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class GracefulShutdownParameterProviderTest {
@Mock
private BootstrapFileProvider bootstrapFileProvider;
@InjectMocks
private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
@ParameterizedTest(name = "{index} => shutdownPropertyValue={0}")
@NullSource
@ValueSource(strings = {"notAnInteger", "-1"})
void testGetBootstrapPropertiesShouldReturnDefaultShutdownPropertyValue(String shutdownProperty) throws IOException {
Properties properties = new Properties();
if (shutdownProperty != null) {
properties.setProperty(GRACEFUL_SHUTDOWN_PROP, shutdownProperty);
}
when(bootstrapFileProvider.getBootstrapProperties()).thenReturn(properties);
assertEquals(Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE), gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
}
@Test
void testGetBootstrapPropertiesShouldReturnShutdownPropertyValue() throws IOException {
Properties properties = new Properties();
properties.setProperty(GRACEFUL_SHUTDOWN_PROP, "1000");
when(bootstrapFileProvider.getBootstrapProperties()).thenReturn(properties);
assertEquals(1000, gracefulShutdownParameterProvider.getGracefulShutdownSeconds());
}
}

View File

@ -97,8 +97,8 @@ public class HierarchicalC2IntegrationTest {
certificatesDirectory.resolve("c2-authoritative").resolve("truststore.jks").toFile().getAbsolutePath(),
"badTrustPass",
KeystoreType.JKS);
healthCheckSocketFactory = trustSslContext.getSocketFactory();
trustSslContext = SslContextFactory.createSslContext(tlsConfiguration);
healthCheckSocketFactory = trustSslContext.getSocketFactory();
docker.before();
}

View File

@ -28,7 +28,6 @@ import java.nio.file.Paths;
public class StandaloneXmlTest extends StandaloneYamlTest {
public void setDocker(String version, String name) throws Exception {
super.setDocker(version, name);
ConfigSchema configSchema;
try (InputStream inputStream = StandaloneXmlTest.class.getClassLoader().getResourceAsStream("./standalone/" + version + "/" + name + "/xml/" + name + ".xml")) {
configSchema = ConfigMain.transformTemplateToSchema(inputStream);
@ -37,6 +36,7 @@ public class StandaloneXmlTest extends StandaloneYamlTest {
.getParent().toAbsolutePath().resolve(getConfigYml(version, name)))) {
SchemaSaver.saveConfigSchema(configSchema, outputStream);
}
super.setDocker(version, name);
}
@Override

View File

@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.xml.gz
nifi.flow.configuration.archive.dir=./target/archive/
nifi.flowcontroller.autoResumeState=true
nifi.flowcontroller.graceful.shutdown.period=10 sec
nifi.flowservice.writedelay.interval=2 sec
nifi.administrative.yield.duration=30 sec
nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
nifi.controller.service.configuration.file=./target/controller-services.xml
nifi.templates.directory=./target/templates
nifi.ui.banner.text=UI Banner Text
nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo
nifi.flowfile.repository.partitions=1
nifi.flowfile.repository.checkpoint.interval=2 mins
nifi.queue.swap.threshold=20000
nifi.swap.storage.directory=./target/test-repo/swap
nifi.swap.in.period=5 sec
nifi.swap.in.threads=1
nifi.swap.out.period=5 sec
nifi.swap.out.threads=4
# Content Repository
nifi.content.claim.max.appendable.size=10 MB
nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/content_repository
# Provenance Repository Properties
nifi.provenance.repository.storage.directory=./target/provenance_repository
nifi.provenance.repository.max.storage.time=24 hours
nifi.provenance.repository.max.storage.size=1 GB
nifi.provenance.repository.rollover.time=30 secs
nifi.provenance.repository.rollover.size=100 MB
# Site to Site properties
nifi.remote.input.socket.port=9990
nifi.remote.input.secure=true
# web properties #
nifi.web.war.directory=./target/lib
nifi.web.http.host=
nifi.web.http.port=8080
nifi.web.https.host=
nifi.web.https.port=
nifi.web.jetty.working.directory=./target/work/jetty
# security properties #
nifi.sensitive.props.key=key
nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
nifi.security.keystore=
nifi.security.keystoreType=
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=
nifi.security.truststoreType=
nifi.security.truststorePasswd=
nifi.security.user.authorizer=
# cluster common properties (cluster manager and nodes must have same values) #
nifi.cluster.protocol.heartbeat.interval=5 sec
nifi.cluster.protocol.is.secure=false
nifi.cluster.protocol.socket.timeout=30 sec
nifi.cluster.protocol.connection.handshake.timeout=45 sec
# if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured #
nifi.cluster.protocol.use.multicast=false
nifi.cluster.protocol.multicast.address=
nifi.cluster.protocol.multicast.port=
nifi.cluster.protocol.multicast.service.broadcast.delay=500 ms
nifi.cluster.protocol.multicast.service.locator.attempts=3
nifi.cluster.protocol.multicast.service.locator.attempts.delay=1 sec
# cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=false
nifi.cluster.node.address=
nifi.cluster.node.protocol.port=
nifi.cluster.node.protocol.threads=2
# if multicast is not used, nifi.cluster.node.unicast.xxx must have same values as nifi.cluster.manager.xxx #
nifi.cluster.node.unicast.manager.address=
nifi.cluster.node.unicast.manager.protocol.port=
nifi.cluster.node.unicast.manager.authority.provider.port=
# cluster manager properties (only configure for cluster manager) #
nifi.cluster.is.manager=false
nifi.cluster.manager.address=
nifi.cluster.manager.protocol.port=
nifi.cluster.manager.authority.provider.port=
nifi.cluster.manager.authority.provider.threads=10
nifi.cluster.manager.node.firewall.file=
nifi.cluster.manager.node.event.history.size=10
nifi.cluster.manager.node.api.connection.timeout=30 sec
nifi.cluster.manager.node.api.read.timeout=30 sec
nifi.cluster.manager.node.api.request.threads=10
nifi.cluster.manager.flow.retrieval.delay=5 sec
nifi.cluster.manager.protocol.threads=10
nifi.cluster.manager.safemode.duration=0 sec
# analytics properties #
nifi.analytics.predict.interval=3 mins
nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares

View File

@ -58,7 +58,7 @@
<logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.processors" level="WARN"/>
<logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="INFO" />
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="DEBUG" />
<logger name="org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor" level="DEBUG" />
<logger name="org.apache.nifi.minifi.c2.cache.filesystem.FileSystemConfigurationCache" level="DEBUG" />
@ -88,7 +88,7 @@
<logger name="org.apache.nifi.minifi.StdOut" level="INFO" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<!-- Everything written to MiNiFi's Standard Error will be logged with the logger org.apache.nifi.minifi.StdErr at ERROR level -->
<logger name="org.apache.nifi.minifi.StdErr" level="ERROR" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />

View File

@ -599,7 +599,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart;
if (LOG.isInfoEnabled()) {
if (LOG.isDebugEnabled()) {
for (final RepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort()) {
final FlowFileRecord flowFile = record.getCurrent();
@ -651,7 +651,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
checkpoint.deleteOnCommit.clear();
if (LOG.isInfoEnabled()) {
if (LOG.isDebugEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
LOG.debug("{} for {}, committed the following events: {}", this, connectableDescription, sessionSummary);