NIFI-13045 Support Bootstrap Listen Port property in MiNiFi

This closes #8648

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Ferenc Kis 2024-04-15 15:14:32 +02:00 committed by exceptionfactory
parent 4104cfd78d
commit 6eff8a9262
No known key found for this signature in database
5 changed files with 31 additions and 10 deletions

View File

@ -25,6 +25,7 @@ import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.STATUS_FILE_PID_KEY;
import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
import static org.apache.nifi.minifi.bootstrap.Status.OK;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.NIFI_BOOTSTRAP_LISTEN_PORT;
import static org.apache.nifi.minifi.commons.api.MiNiFiCommandState.FULLY_APPLIED;
import static org.apache.nifi.minifi.commons.api.MiNiFiCommandState.NOT_APPLIED_WITH_RESTART;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
@ -129,7 +130,7 @@ public class StartRunner implements CommandRunner {
generateMiNiFiProperties(bootstrapFileProvider.getProtectedBootstrapProperties(), confDir);
regenerateFlowConfiguration(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey()));
Process process = startMiNiFi();
Process process = startMiNiFi(bootstrapProperties);
try {
while (true) {
if (process.isAlive()) {
@ -279,15 +280,33 @@ public class StartRunner implements CommandRunner {
}
}
private Process startMiNiFi() throws IOException {
private Process startMiNiFi(BootstrapProperties bootstrapProperties) throws IOException {
int bootstrapListenPort = parseBootstrapListenPort(bootstrapProperties);
MiNiFiListener listener = new MiNiFiListener();
listenPort = listener.start(runMiNiFi, bootstrapFileProvider, configurationChangeListener);
listenPort = listener.start(runMiNiFi, bootstrapListenPort, bootstrapFileProvider, configurationChangeListener);
CMD_LOGGER.info("Starting Apache MiNiFi...");
return startMiNiFiProcess(getProcessBuilder());
}
private int parseBootstrapListenPort(BootstrapProperties bootstrapProperties) {
String bootstrapListenPort = bootstrapProperties.getProperty(NIFI_BOOTSTRAP_LISTEN_PORT);
int parsedBootstrapListenPort = Optional.ofNullable(bootstrapListenPort)
.map(String::trim)
.map(port -> {
try {
return Integer.parseInt(port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Unable to parse Bootstrap listen port, please provide a valid port for " + NIFI_BOOTSTRAP_LISTEN_PORT);
}
})
.orElse(0);
CMD_LOGGER.info("Boostrap listen port {}", parsedBootstrapListenPort);
return parsedBootstrapListenPort;
}
private ProcessBuilder getProcessBuilder() throws IOException {
ProcessBuilder builder = new ProcessBuilder();
File workingDir = getWorkingDir();

View File

@ -45,6 +45,8 @@ public class MiNiFiExecCommandProvider {
public static final String DEFAULT_BOOTSTRAP_LOG_FILE_NAME = "minifi-bootstrap";
public static final String DEFAULT_LOG_FILE_EXTENSION = "log";
public static final String NIFI_BOOTSTRAP_LISTEN_PORT = "nifi.bootstrap.listen.port";
private static final String PROPERTIES_FILE_KEY = "props.file";
private static final String LIB_DIR_KEY = "lib.dir";
private static final String JAVA_COMMAND_KEY = "java";
@ -63,7 +65,6 @@ public class MiNiFiExecCommandProvider {
private static final String MINIFI_CLASS_NAME = "MiNiFi";
private static final String MINIFI_FULLY_QUALIFIED_CLASS_NAME = "org.apache.nifi.minifi." + MINIFI_CLASS_NAME;
private static final String SYSTEM_PROPERTY_TEMPLATE = "-D%s=%s";
private static final String NIFI_BOOTSTRAP_LISTEN_PORT = "nifi.bootstrap.listen.port";
private static final String APP = "app";
private static final String CLASSPATH = "-classpath";
private static final String BIN_DIRECTORY = "bin";

View File

@ -38,13 +38,13 @@ public class MiNiFiListener {
private Listener listener;
private ServerSocket serverSocket;
public int start(RunMiNiFi runner, BootstrapFileProvider bootstrapFileProvider, ConfigurationChangeListener configurationChangeListener) throws IOException {
public int start(RunMiNiFi runner, int listenPort, BootstrapFileProvider bootstrapFileProvider, ConfigurationChangeListener configurationChangeListener) throws IOException {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
serverSocket.bind(new InetSocketAddress("localhost", listenPort));
listener = new Listener(serverSocket, new BootstrapCodec(runner, bootstrapFileProvider, configurationChangeListener));
Thread listenThread = new Thread(listener);
listenThread.setName("MiNiFi listener");
listenThread.setName("Listen to MiNiFi");
listenThread.setDaemon(true);
listenThread.start();
return serverSocket.getLocalPort();

View File

@ -73,11 +73,11 @@ public class BootstrapListener implements BootstrapCommunicator {
registerHandlers();
}
public void start() throws IOException {
public void start(int listenPort) throws IOException {
logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
serverSocket.bind(new InetSocketAddress("localhost", listenPort));
serverSocket.setSoTimeout(2000);
int localPort = serverSocket.getLocalPort();

View File

@ -127,7 +127,8 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
NiFiProperties niFiProperties = getNiFiProperties();
bootstrapListener.start(niFiProperties.getDefaultListenerBootstrapPort());
} catch (IOException e) {
throw new UncheckedIOException("Failed to start MiNiFi because of Bootstrap listener initialization error", e);
} catch (NumberFormatException e) {