Fix issue where the writer would go into a cycle of adding delay for
writes of large messages where each packet ended up as the same size.
This commit is contained in:
Timothy Bish 2014-05-08 12:56:54 -04:00
parent d43e709565
commit 210e39df8b
1 changed files with 45 additions and 32 deletions

View File

@ -28,11 +28,11 @@ import javax.net.ssl.SSLEngine;
import org.apache.activemq.transport.tcp.TimeStampStream; import org.apache.activemq.transport.tcp.TimeStampStream;
/** /**
* An optimized buffered outputstream for Tcp * An optimized buffered OutputStream for TCP/IP
*/ */
public class NIOOutputStream extends OutputStream implements TimeStampStream { public class NIOOutputStream extends OutputStream implements TimeStampStream {
private static final int BUFFER_SIZE = 8192; private static final int BUFFER_SIZE = 8196;
private final WritableByteChannel out; private final WritableByteChannel out;
private final byte[] buffer; private final byte[] buffer;
@ -48,6 +48,7 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
* Constructor * Constructor
* *
* @param out * @param out
* the channel to write data to.
*/ */
public NIOOutputStream(WritableByteChannel out) { public NIOOutputStream(WritableByteChannel out) {
this(out, BUFFER_SIZE); this(out, BUFFER_SIZE);
@ -57,8 +58,11 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
* Creates a new buffered output stream to write data to the specified * Creates a new buffered output stream to write data to the specified
* underlying output stream with the specified buffer size. * underlying output stream with the specified buffer size.
* *
* @param out the underlying output stream. * @param out
* @param size the buffer size. * the underlying output stream.
* @param size
* the buffer size.
*
* @throws IllegalArgumentException if size <= 0. * @throws IllegalArgumentException if size <= 0.
*/ */
public NIOOutputStream(WritableByteChannel out, int size) { public NIOOutputStream(WritableByteChannel out, int size) {
@ -73,8 +77,10 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
/** /**
* write a byte on to the stream * write a byte on to the stream
* *
* @param b - byte to write * @param b
* @throws IOException * byte to write
*
* @throws IOException if an error occurs while writing the data.
*/ */
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
@ -88,10 +94,14 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
/** /**
* write a byte array to the stream * write a byte array to the stream
* *
* @param b the byte buffer * @param b
* @param off the offset into the buffer * the byte buffer
* @param len the length of data to write * @param off
* @throws IOException * the offset into the buffer
* @param len
* the length of data to write
*
* @throws IOException if an error occurs while writing the data.
*/ */
@Override @Override
public void write(byte b[], int off, int len) throws IOException { public void write(byte b[], int off, int len) throws IOException {
@ -109,10 +119,10 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
/** /**
* flush the data to the output stream This doesn't call flush on the * flush the data to the output stream This doesn't call flush on the
* underlying outputstream, because Tcp is particularly efficent at doing * underlying OutputStream, because TCP/IP is particularly efficient at doing
* this itself .... * this itself ....
* *
* @throws IOException * @throws IOException if an error occurs while writing the data.
*/ */
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
@ -168,17 +178,17 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
} }
int remaining = plain.remaining(); int remaining = plain.remaining();
int lastRemaining = remaining - 1;
long delay = 1; long delay = 1;
int lastWriteSize = -1;
try { try {
writeTimestamp = System.currentTimeMillis(); writeTimestamp = System.currentTimeMillis();
while (remaining > 0) { while (remaining > 0) {
// We may need to do a little bit of sleeping to avoid a busy loop. // We may need to do a little bit of sleeping to avoid a busy
// Slow down if no data was written out.. // loop. Slow down if no data was written out..
if (remaining == lastRemaining) { if (lastWriteSize == 0) {
try { try {
// Use exponential rollback to increase sleep time. // Use exponential growth to increase sleep time.
Thread.sleep(delay); Thread.sleep(delay);
delay *= 2; delay *= 2;
if (delay > 1000) { if (delay > 1000) {
@ -190,16 +200,15 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
} else { } else {
delay = 1; delay = 1;
} }
lastRemaining = remaining;
// Since the write is non-blocking, all the data may not have been // Since the write is non-blocking, all the data may not have
// written. // been written.
out.write(plain); lastWriteSize = out.write(plain);
// if the data buffer was larger than the packet buffer we might need to // if the data buffer was larger than the packet buffer we might
// wrap more packets until we reach the end of data, but only when plain // need to wrap more packets until we reach the end of data, but only
// has no more space since we are non-blocking and a write might not have // when plain has no more space since we are non-blocking and a write
// written anything. // might not have written anything.
if (engine != null && data.hasRemaining() && !plain.hasRemaining()) { if (engine != null && data.hasRemaining() && !plain.hasRemaining()) {
plain.clear(); plain.clear();
engine.wrap(data, plain); engine.wrap(data, plain);
@ -213,8 +222,9 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
} }
} }
/*
/* (non-Javadoc) * (non-Javadoc)
*
* @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
*/ */
@Override @Override
@ -222,8 +232,11 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream {
return writeTimestamp > 0; return writeTimestamp > 0;
} }
/* (non-Javadoc) /*
* @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() * (non-Javadoc)
*
* @see
* org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
*/ */
@Override @Override
public long getWriteTimestamp() { public long getWriteTimestamp() {