From 8741b6f6a5fea9815984f1e48dbe723d7948746f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 3 Oct 2017 15:55:02 -0400 Subject: [PATCH] NIFI-4461: When reading from socket channel use blocking mode instead of sleeping; when writing, use a far smaller sleep duration This closes #2193. Signed-off-by: Bryan Bende --- .../io/socket/ssl/SSLSocketChannel.java | 13 ++- .../io/socket/SocketChannelInputStream.java | 84 ++++++++++--------- .../io/socket/SocketChannelOutputStream.java | 16 +++- 3 files changed, 68 insertions(+), 45 deletions(-) diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 7e5b303260..f225bb4e68 100644 --- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -48,7 +48,7 @@ public class SSLSocketChannel implements Closeable { public static final int MAX_WRITE_SIZE = 65536; private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class); - private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); + private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); private final String hostname; private final int port; @@ -307,18 +307,21 @@ public class SSLSocketChannel implements Closeable { final int readCount = channel.read(dest); + long sleepNanos = 1L; if (readCount == 0) { if (System.currentTimeMillis() > startTime + timeoutMillis) { throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port); } try { - TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); + TimeUnit.NANOSECONDS.sleep(sleepNanos); } catch (InterruptedException e) { close(); Thread.currentThread().interrupt(); // set the interrupt status throw new ClosedByInterruptException(); } + sleepNanos = Math.min(sleepNanos * 2, BUFFER_FULL_EMPTY_WAIT_NANOS); + continue; } @@ -360,6 +363,8 @@ public class SSLSocketChannel implements Closeable { final int written = channel.write(src); bytesWritten += written; final long now = System.currentTimeMillis(); + long sleepNanos = 1L; + if (written > 0) { lastByteWrittenTime = now; } else { @@ -367,12 +372,14 @@ public class SSLSocketChannel implements Closeable { throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port); } try { - TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); + TimeUnit.NANOSECONDS.sleep(sleepNanos); } catch (final InterruptedException e) { close(); Thread.currentThread().interrupt(); // set the interrupt status throw new ClosedByInterruptException(); } + + sleepNanos = Math.min(sleepNanos * 2, BUFFER_FULL_EMPTY_WAIT_NANOS); } } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java index 1ec229df40..c0cfa11c84 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; @@ -68,28 +67,33 @@ public class SocketChannelInputStream extends InputStream { oneByteBuffer.clear(); final long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesRead; - do { - bytesRead = channel.read(oneByteBuffer); - if (bytesRead == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out reading from socket"); - } - try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation - } - } - } while (bytesRead == 0); - if (bytesRead == -1) { - return -1; + final boolean blocking = channel.isBlocking(); + + try { + channel.configureBlocking(true); + + int bytesRead; + do { + bytesRead = channel.read(oneByteBuffer); + if (bytesRead == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out reading from socket"); + } + } + } while (bytesRead == 0); + + if (bytesRead == -1) { + return -1; + } + + oneByteBuffer.flip(); + return oneByteBuffer.get() & 0xFF; + } finally { + if (!blocking) { + channel.configureBlocking(false); + } } - oneByteBuffer.flip(); - return oneByteBuffer.get() & 0xFF; } @Override @@ -108,25 +112,27 @@ public class SocketChannelInputStream extends InputStream { final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); - final long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesRead; - do { - bytesRead = channel.read(buffer); - if (bytesRead == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out reading from socket"); - } - try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation - } - } - } while (bytesRead == 0); + final boolean blocking = channel.isBlocking(); + try { + channel.configureBlocking(true); - return bytesRead; + final long maxTime = System.currentTimeMillis() + timeoutMillis; + int bytesRead; + do { + bytesRead = channel.read(buffer); + if (bytesRead == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out reading from socket"); + } + } + } while (bytesRead == 0); + + return bytesRead; + } finally { + if (!blocking) { + channel.configureBlocking(false); + } + } } @Override diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java index a56d9dd20c..6a387a9e45 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; public class SocketChannelOutputStream extends OutputStream { - private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); + private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); private final SocketChannel channel; private volatile int timeout = 30000; @@ -52,19 +52,24 @@ public class SocketChannelOutputStream extends OutputStream { final int timeoutMillis = this.timeout; long maxTime = System.currentTimeMillis() + timeoutMillis; int bytesWritten; + + long sleepNanos = 1L; while (oneByteBuffer.hasRemaining()) { bytesWritten = channel.write(oneByteBuffer); if (bytesWritten == 0) { if (System.currentTimeMillis() > maxTime) { throw new SocketTimeoutException("Timed out writing to socket"); } + try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); + TimeUnit.NANOSECONDS.sleep(sleepNanos); } catch (InterruptedException e) { close(); Thread.currentThread().interrupt(); // set the interrupt status throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation } + + sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS); } else { return; } @@ -83,19 +88,24 @@ public class SocketChannelOutputStream extends OutputStream { final int timeoutMillis = this.timeout; long maxTime = System.currentTimeMillis() + timeoutMillis; int bytesWritten; + + long sleepNanos = 1L; while (buffer.hasRemaining()) { bytesWritten = channel.write(buffer); if (bytesWritten == 0) { if (System.currentTimeMillis() > maxTime) { throw new SocketTimeoutException("Timed out writing to socket"); } + try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); + TimeUnit.NANOSECONDS.sleep(sleepNanos); } catch (InterruptedException e) { close(); Thread.currentThread().interrupt(); // set the interrupt status throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation } + + sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS); } else { maxTime = System.currentTimeMillis() + timeoutMillis; }