diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java index 643416f463..621e9566fe 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java @@ -28,11 +28,11 @@ import javax.net.ssl.SSLEngine; import org.apache.activemq.transport.tcp.TimeStampStream; /** - * An optimized buffered outputstream for Tcp + * An optimized buffered OutputStream for TCP/IP */ public class NIOOutputStream extends OutputStream implements TimeStampStream { - private static final int BUFFER_SIZE = 8192; + private static final int BUFFER_SIZE = 8196; private final WritableByteChannel out; private final byte[] buffer; @@ -40,7 +40,7 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { private int count; private boolean closed; - private volatile long writeTimestamp = -1;//concurrent reads of this value + private volatile long writeTimestamp = -1; // concurrent reads of this value private SSLEngine engine; @@ -48,6 +48,7 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { * Constructor * * @param out + * the channel to write data to. */ public NIOOutputStream(WritableByteChannel out) { this(out, BUFFER_SIZE); @@ -57,8 +58,11 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { * Creates a new buffered output stream to write data to the specified * underlying output stream with the specified buffer size. * - * @param out the underlying output stream. - * @param size the buffer size. + * @param out + * the underlying output stream. + * @param size + * the buffer size. + * * @throws IllegalArgumentException if size <= 0. */ public NIOOutputStream(WritableByteChannel out, int size) { @@ -73,8 +77,10 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { /** * write a byte on to the stream * - * @param b - byte to write - * @throws IOException + * @param b + * byte to write + * + * @throws IOException if an error occurs while writing the data. */ @Override public void write(int b) throws IOException { @@ -82,16 +88,20 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { if (availableBufferToWrite() < 1) { flush(); } - buffer[count++] = (byte)b; + buffer[count++] = (byte) b; } /** * write a byte array to the stream * - * @param b the byte buffer - * @param off the offset into the buffer - * @param len the length of data to write - * @throws IOException + * @param b + * the byte buffer + * @param off + * the offset into the buffer + * @param len + * the length of data to write + * + * @throws IOException if an error occurs while writing the data. */ @Override public void write(byte b[], int off, int len) throws IOException { @@ -109,10 +119,10 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { /** * flush the data to the output stream This doesn't call flush on the - * underlying outputstream, because Tcp is particularly efficent at doing + * underlying OutputStream, because TCP/IP is particularly efficient at doing * this itself .... * - * @throws IOException + * @throws IOException if an error occurs while writing the data. */ @Override public void flush() throws IOException { @@ -163,22 +173,22 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { plain.clear(); engine.wrap(data, plain); plain.flip(); - } else { + } else { plain = data; } int remaining = plain.remaining(); - int lastRemaining = remaining - 1; long delay = 1; + int lastWriteSize = -1; try { writeTimestamp = System.currentTimeMillis(); while (remaining > 0) { - // We may need to do a little bit of sleeping to avoid a busy loop. - // Slow down if no data was written out.. - if (remaining == lastRemaining) { + // We may need to do a little bit of sleeping to avoid a busy + // loop. Slow down if no data was written out.. + if (lastWriteSize == 0) { try { - // Use exponential rollback to increase sleep time. + // Use exponential growth to increase sleep time. Thread.sleep(delay); delay *= 2; if (delay > 1000) { @@ -190,16 +200,15 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { } else { delay = 1; } - lastRemaining = remaining; - // Since the write is non-blocking, all the data may not have been - // written. - out.write(plain); + // Since the write is non-blocking, all the data may not have + // been written. + lastWriteSize = out.write(plain); - // if the data buffer was larger than the packet buffer we might need to - // wrap more packets until we reach the end of data, but only when plain - // has no more space since we are non-blocking and a write might not have - // written anything. + // if the data buffer was larger than the packet buffer we might + // need to wrap more packets until we reach the end of data, but only + // when plain has no more space since we are non-blocking and a write + // might not have written anything. if (engine != null && data.hasRemaining() && !plain.hasRemaining()) { plain.clear(); engine.wrap(data, plain); @@ -213,8 +222,9 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { } } - - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() */ @Override @@ -222,8 +232,11 @@ public class NIOOutputStream extends OutputStream implements TimeStampStream { return writeTimestamp > 0; } - /* (non-Javadoc) - * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() */ @Override public long getWriteTimestamp() {