From 66f3b7e30f7726308ac25b7d0f2a2856880d1852 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 4 Jun 2015 14:10:43 -0400 Subject: [PATCH 1/2] NIFI-602: Ensure we read all data from socket after sending 'SHUTDOWN' command --- .../org/apache/nifi/bootstrap/RunNiFi.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index bb83e3d4b3..e6b1bc5039 100644 --- a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -229,7 +229,10 @@ public class RunNiFi { props.load(fis); } - logger.log(Level.FINE, "Properties: {0}", props); + final Map modified = new HashMap<>(props); + modified.remove("secret.key"); + logger.log(Level.FINE, "Properties: {0}", modified); + return props; } @@ -250,8 +253,8 @@ public class RunNiFi { Files.setPosixFilePermissions(statusFile.toPath(), perms); } catch (final Exception e) { logger.log(Level.WARNING, "Failed to set permissions so that only the owner can read status file {0}; " - + "this may allows others to have access to the key needed to communicate with NiFi. " - + "Permissions should be changed so that only the owner can read this file", statusFile); + + "this may allows others to have access to the key needed to communicate with NiFi. " + + "Permissions should be changed so that only the owner can read this file", statusFile); } try (final FileOutputStream fos = new FileOutputStream(statusFile)) { @@ -328,8 +331,8 @@ public class RunNiFi { boolean running = false; String line; try (final InputStream in = proc.getInputStream(); - final Reader streamReader = new InputStreamReader(in); - final BufferedReader reader = new BufferedReader(streamReader)) { + final Reader streamReader = new InputStreamReader(in); + final BufferedReader reader = new BufferedReader(streamReader)) { while ((line = reader.readLine()) != null) { if (line.trim().startsWith(pid)) { @@ -395,7 +398,7 @@ public class RunNiFi { final Status status = getStatus(); if (status.isRespondingToPing()) { logger.log(Level.INFO, "Apache NiFi is currently running, listening to Bootstrap on port {0}, PID={1}", - new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()}); + new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()}); return; } @@ -488,12 +491,15 @@ public class RunNiFi { final OutputStream out = socket.getOutputStream(); out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); - out.close(); + socket.shutdownOutput(); final InputStream in = socket.getInputStream(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - final String response = reader.readLine(); - reader.close(); + int lastChar; + final StringBuilder sb = new StringBuilder(); + while ((lastChar = in.read()) > -1) { + sb.append((char) lastChar); + } + final String response = sb.toString().trim(); logger.log(Level.FINE, "Received response to SHUTDOWN command: {0}", response); @@ -557,7 +563,7 @@ public class RunNiFi { final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid}); final List childPids = new ArrayList<>(); try (final InputStream in = proc.getInputStream(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { String line; while ((line = reader.readLine()) != null) { @@ -693,7 +699,7 @@ public class RunNiFi { if (javaHome != null) { String fileExtension = isWindows() ? ".exe" : ""; File javaFile = new File(javaHome + File.separatorChar + "bin" - + File.separatorChar + "java" + fileExtension); + + File.separatorChar + "java" + fileExtension); if (javaFile.exists() && javaFile.canExecute()) { javaCmd = javaFile.getAbsolutePath(); } @@ -748,12 +754,12 @@ public class RunNiFi { gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); } catch (final NumberFormatException nfe) { throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " - + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); } if (gracefulShutdownSeconds < 0) { throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " - + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); } Process process = builder.start(); From 8d1536ed24ea0b149f9fa14ead073b8decdead7d Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 5 Jun 2015 08:49:01 -0400 Subject: [PATCH 2/2] NIFI-41: Don't allow destination fo connection to change if current destination is holding its FlowFiles --- .../org/apache/nifi/controller/FlowFileQueue.java | 9 +++++++++ .../nifi/controller/StandardFlowFileQueue.java | 13 +++++++++---- .../apache/nifi/connectable/StandardConnection.java | 4 ++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java index 92a4ee0301..e1baeb7558 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java @@ -100,6 +100,15 @@ public interface FlowFileQueue { QueueSize getActiveQueueSize(); + /** + * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile + * is considered to be unacknowledged if it has been pulled from the queue by some component + * but the session that pulled the FlowFile has not yet been committed or rolled back. + * + * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. + */ + QueueSize getUnacknowledgedQueueSize(); + void acknowledge(FlowFileRecord flowFile); void acknowledge(Collection flowFiles); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 075f2cf5f3..8f6c8ed944 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -229,7 +229,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, - activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); + activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); } @Override @@ -526,9 +526,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final QueueSize unacknowledged = unacknowledgedSizeRef.get(); logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", - activeQueue.size(), activeQueueContentSize / byteToMbDivisor, - swappedRecordCount, swappedContentSize / byteToMbDivisor, - unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); + activeQueue.size(), activeQueueContentSize / byteToMbDivisor, + swappedRecordCount, swappedContentSize / byteToMbDivisor, + unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); } return swapQueue.size(); @@ -961,6 +961,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.unlock("external unlock"); } + @Override + public QueueSize getUnacknowledgedQueueSize() { + return unacknowledgedSizeRef.get(); + } + private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { boolean updated = false; diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 86c9320ff2..ad556e2c50 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -181,6 +181,10 @@ public final class StandardConnection implements Connection { throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); } + if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) { + throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination); + } + try { previousDestination.removeConnection(this); this.destination.set(newDestination);