From 2feafb9a9726dbac73e0f42852573b82c988ad6a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 1 Mar 2013 14:30:51 +0100 Subject: [PATCH] 402090 - httpsender PendingState cause uncertain data send to server. WriteFlusher was storing consumed buffers that may have been reused, and when the write was being completed those consumed buffer may have contained new content that was being written too. Fixed by compacting the buffers at the moment of creation of the WriteFlusher.PendingState (and not at the moment of the completeWrite() like it was in historically done, see 4e9460161944b5ccfa4bf1b5494d96ef395a0e76). --- .../org/eclipse/jetty/io/WriteFlusher.java | 43 ++++++++++++++- .../eclipse/jetty/io/WriteFlusherTest.java | 55 ++++++++++++++----- 2 files changed, 82 insertions(+), 16 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index c43a41c4f1e..8b3def67449 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -45,6 +45,7 @@ abstract public class WriteFlusher { private static final Logger LOG = Log.getLogger(WriteFlusher.class); private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false + private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0]; private static final EnumMap> __stateTransitions = new EnumMap<>(StateType.class); private static final State __IDLE = new IdleState(); private static final State __WRITING = new WritingState(); @@ -243,7 +244,7 @@ abstract public class WriteFlusher private PendingState(ByteBuffer[] buffers, Callback callback) { super(StateType.PENDING); - _buffers = buffers; + _buffers = compact(buffers); _callback = callback; } @@ -263,6 +264,44 @@ abstract public class WriteFlusher if (_callback!=null) _callback.succeeded(); } + + /** + * Compacting the buffers is needed because the semantic of WriteFlusher is + * to write the buffers and if the caller sees that the buffer is consumed, + * then it can recycle it. + * If we do not compact, then it is possible that we store a consumed buffer, + * which is then recycled and refilled; when the WriteFlusher is invoked to + * complete the write, it will write the refilled bytes, garbling the content. + * + * @param buffers the buffers to compact + * @return the compacted buffers + */ + private ByteBuffer[] compact(ByteBuffer[] buffers) + { + int length = buffers.length; + + // Just one element, no need to compact + if (length < 2) + return buffers; + + // How many still have content ? + int consumed = 0; + while (consumed < length && BufferUtil.isEmpty(buffers[consumed])) + ++consumed; + + // All of them still have content, no need to compact + if (consumed == 0) + return buffers; + + // None has content, return empty + if (consumed == length) + return EMPTY_BUFFERS; + + int newLength = length - consumed; + ByteBuffer[] result = new ByteBuffer[newLength]; + System.arraycopy(buffers, consumed, result, 0, newLength); + return result; + } } /** @@ -306,7 +345,7 @@ abstract public class WriteFlusher if (updateState(__WRITING,pending)) onIncompleteFlushed(); else - fail(new PendingState(buffers, callback)); + fail(pending); return; } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index b1eeadc478b..85fe4f50578 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -18,19 +18,11 @@ package org.eclipse.jetty.io; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; import java.security.SecureRandom; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -54,16 +46,23 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class WriteFlusherTest { - @Mock - private EndPoint _endPointMock; - - private WriteFlusher _flusher; - private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false); private final ExecutorService executor = Executors.newFixedThreadPool(16); + @Mock + private EndPoint _endPointMock; + private WriteFlusher _flusher; private ByteArrayEndPoint _endp; @Before @@ -400,6 +399,34 @@ public class WriteFlusherTest assertThat("callback completed", callback.isDone(), is(true)); } + @Test + public void testPendingWriteDoesNotStoreConsumedBuffers() throws Exception + { + int toWrite = _endp.getOutput().capacity(); + byte[] chunk1 = new byte[toWrite / 2]; + Arrays.fill(chunk1, (byte)1); + ByteBuffer buffer1 = ByteBuffer.wrap(chunk1); + byte[] chunk2 = new byte[toWrite]; + Arrays.fill(chunk1, (byte)2); + ByteBuffer buffer2 = ByteBuffer.wrap(chunk2); + + _flusher.write(new Callback.Adapter(), buffer1, buffer2); + assertTrue(_flushIncomplete.get()); + assertFalse(buffer1.hasRemaining()); + + // Reuse buffer1 + buffer1.clear(); + Arrays.fill(chunk1, (byte)3); + int remaining1 = buffer1.remaining(); + + // Complete the write + _endp.takeOutput(); + _flusher.completeWrite(); + + // Make sure buffer1 is unchanged + assertEquals(remaining1, buffer1.remaining()); + } + private class ExposingStateCallback extends FutureCallback { private boolean failed = false;