NIFI-4461: When reading from socket channel use blocking mode instead of sleeping; when writing, use a far smaller sleep duration

This closes #2193.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-10-03 15:55:02 -04:00 committed by Bryan Bende
parent e773fa5513
commit 8741b6f6a5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 68 additions and 45 deletions

View File

@ -48,7 +48,7 @@ public class SSLSocketChannel implements Closeable {
public static final int MAX_WRITE_SIZE = 65536; public static final int MAX_WRITE_SIZE = 65536;
private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class); private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
private final String hostname; private final String hostname;
private final int port; private final int port;
@ -307,18 +307,21 @@ public class SSLSocketChannel implements Closeable {
final int readCount = channel.read(dest); final int readCount = channel.read(dest);
long sleepNanos = 1L;
if (readCount == 0) { if (readCount == 0) {
if (System.currentTimeMillis() > startTime + timeoutMillis) { if (System.currentTimeMillis() > startTime + timeoutMillis) {
throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port); throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
} }
try { try {
TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); TimeUnit.NANOSECONDS.sleep(sleepNanos);
} catch (InterruptedException e) { } catch (InterruptedException e) {
close(); close();
Thread.currentThread().interrupt(); // set the interrupt status Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); throw new ClosedByInterruptException();
} }
sleepNanos = Math.min(sleepNanos * 2, BUFFER_FULL_EMPTY_WAIT_NANOS);
continue; continue;
} }
@ -360,6 +363,8 @@ public class SSLSocketChannel implements Closeable {
final int written = channel.write(src); final int written = channel.write(src);
bytesWritten += written; bytesWritten += written;
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
long sleepNanos = 1L;
if (written > 0) { if (written > 0) {
lastByteWrittenTime = now; lastByteWrittenTime = now;
} else { } else {
@ -367,12 +372,14 @@ public class SSLSocketChannel implements Closeable {
throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port); throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
} }
try { try {
TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); TimeUnit.NANOSECONDS.sleep(sleepNanos);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
close(); close();
Thread.currentThread().interrupt(); // set the interrupt status Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); throw new ClosedByInterruptException();
} }
sleepNanos = Math.min(sleepNanos * 2, BUFFER_FULL_EMPTY_WAIT_NANOS);
} }
} }

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -68,28 +67,33 @@ public class SocketChannelInputStream extends InputStream {
oneByteBuffer.clear(); oneByteBuffer.clear();
final long maxTime = System.currentTimeMillis() + timeoutMillis; final long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesRead;
do {
bytesRead = channel.read(oneByteBuffer);
if (bytesRead == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out reading from socket");
}
try {
TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
} catch (InterruptedException e) {
close();
Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
}
}
} while (bytesRead == 0);
if (bytesRead == -1) { final boolean blocking = channel.isBlocking();
return -1;
try {
channel.configureBlocking(true);
int bytesRead;
do {
bytesRead = channel.read(oneByteBuffer);
if (bytesRead == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out reading from socket");
}
}
} while (bytesRead == 0);
if (bytesRead == -1) {
return -1;
}
oneByteBuffer.flip();
return oneByteBuffer.get() & 0xFF;
} finally {
if (!blocking) {
channel.configureBlocking(false);
}
} }
oneByteBuffer.flip();
return oneByteBuffer.get() & 0xFF;
} }
@Override @Override
@ -108,25 +112,27 @@ public class SocketChannelInputStream extends InputStream {
final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
final long maxTime = System.currentTimeMillis() + timeoutMillis; final boolean blocking = channel.isBlocking();
int bytesRead; try {
do { channel.configureBlocking(true);
bytesRead = channel.read(buffer);
if (bytesRead == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out reading from socket");
}
try {
TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
} catch (InterruptedException e) {
close();
Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
}
}
} while (bytesRead == 0);
return bytesRead; final long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesRead;
do {
bytesRead = channel.read(buffer);
if (bytesRead == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out reading from socket");
}
}
} while (bytesRead == 0);
return bytesRead;
} finally {
if (!blocking) {
channel.configureBlocking(false);
}
}
} }
@Override @Override

View File

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
public class SocketChannelOutputStream extends OutputStream { public class SocketChannelOutputStream extends OutputStream {
private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
private final SocketChannel channel; private final SocketChannel channel;
private volatile int timeout = 30000; private volatile int timeout = 30000;
@ -52,19 +52,24 @@ public class SocketChannelOutputStream extends OutputStream {
final int timeoutMillis = this.timeout; final int timeoutMillis = this.timeout;
long maxTime = System.currentTimeMillis() + timeoutMillis; long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesWritten; int bytesWritten;
long sleepNanos = 1L;
while (oneByteBuffer.hasRemaining()) { while (oneByteBuffer.hasRemaining()) {
bytesWritten = channel.write(oneByteBuffer); bytesWritten = channel.write(oneByteBuffer);
if (bytesWritten == 0) { if (bytesWritten == 0) {
if (System.currentTimeMillis() > maxTime) { if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out writing to socket"); throw new SocketTimeoutException("Timed out writing to socket");
} }
try { try {
TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); TimeUnit.NANOSECONDS.sleep(sleepNanos);
} catch (InterruptedException e) { } catch (InterruptedException e) {
close(); close();
Thread.currentThread().interrupt(); // set the interrupt status Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
} }
sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
} else { } else {
return; return;
} }
@ -83,19 +88,24 @@ public class SocketChannelOutputStream extends OutputStream {
final int timeoutMillis = this.timeout; final int timeoutMillis = this.timeout;
long maxTime = System.currentTimeMillis() + timeoutMillis; long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesWritten; int bytesWritten;
long sleepNanos = 1L;
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
bytesWritten = channel.write(buffer); bytesWritten = channel.write(buffer);
if (bytesWritten == 0) { if (bytesWritten == 0) {
if (System.currentTimeMillis() > maxTime) { if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out writing to socket"); throw new SocketTimeoutException("Timed out writing to socket");
} }
try { try {
TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); TimeUnit.NANOSECONDS.sleep(sleepNanos);
} catch (InterruptedException e) { } catch (InterruptedException e) {
close(); close();
Thread.currentThread().interrupt(); // set the interrupt status Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
} }
sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
} else { } else {
maxTime = System.currentTimeMillis() + timeoutMillis; maxTime = System.currentTimeMillis() + timeoutMillis;
} }