402090 - httpsender PendingState cause uncertain data send to server.
WriteFlusher was storing consumed buffers that may have been reused,
and when the write was being completed those consumed buffer may have
contained new content that was being written too.
Fixed by compacting the buffers at the moment of creation of the
WriteFlusher.PendingState (and not at the moment of the completeWrite() like
it was in historically done, see 4e94601619
).
This commit is contained in:
parent
951d8c8472
commit
2feafb9a97
|
@ -45,6 +45,7 @@ abstract public class WriteFlusher
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(WriteFlusher.class);
|
private static final Logger LOG = Log.getLogger(WriteFlusher.class);
|
||||||
private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
|
private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
|
||||||
|
private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
|
||||||
private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
|
private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
|
||||||
private static final State __IDLE = new IdleState();
|
private static final State __IDLE = new IdleState();
|
||||||
private static final State __WRITING = new WritingState();
|
private static final State __WRITING = new WritingState();
|
||||||
|
@ -243,7 +244,7 @@ abstract public class WriteFlusher
|
||||||
private PendingState(ByteBuffer[] buffers, Callback callback)
|
private PendingState(ByteBuffer[] buffers, Callback callback)
|
||||||
{
|
{
|
||||||
super(StateType.PENDING);
|
super(StateType.PENDING);
|
||||||
_buffers = buffers;
|
_buffers = compact(buffers);
|
||||||
_callback = callback;
|
_callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,6 +264,44 @@ abstract public class WriteFlusher
|
||||||
if (_callback!=null)
|
if (_callback!=null)
|
||||||
_callback.succeeded();
|
_callback.succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compacting the buffers is needed because the semantic of WriteFlusher is
|
||||||
|
* to write the buffers and if the caller sees that the buffer is consumed,
|
||||||
|
* then it can recycle it.
|
||||||
|
* If we do not compact, then it is possible that we store a consumed buffer,
|
||||||
|
* which is then recycled and refilled; when the WriteFlusher is invoked to
|
||||||
|
* complete the write, it will write the refilled bytes, garbling the content.
|
||||||
|
*
|
||||||
|
* @param buffers the buffers to compact
|
||||||
|
* @return the compacted buffers
|
||||||
|
*/
|
||||||
|
private ByteBuffer[] compact(ByteBuffer[] buffers)
|
||||||
|
{
|
||||||
|
int length = buffers.length;
|
||||||
|
|
||||||
|
// Just one element, no need to compact
|
||||||
|
if (length < 2)
|
||||||
|
return buffers;
|
||||||
|
|
||||||
|
// How many still have content ?
|
||||||
|
int consumed = 0;
|
||||||
|
while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
|
||||||
|
++consumed;
|
||||||
|
|
||||||
|
// All of them still have content, no need to compact
|
||||||
|
if (consumed == 0)
|
||||||
|
return buffers;
|
||||||
|
|
||||||
|
// None has content, return empty
|
||||||
|
if (consumed == length)
|
||||||
|
return EMPTY_BUFFERS;
|
||||||
|
|
||||||
|
int newLength = length - consumed;
|
||||||
|
ByteBuffer[] result = new ByteBuffer[newLength];
|
||||||
|
System.arraycopy(buffers, consumed, result, 0, newLength);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -306,7 +345,7 @@ abstract public class WriteFlusher
|
||||||
if (updateState(__WRITING,pending))
|
if (updateState(__WRITING,pending))
|
||||||
onIncompleteFlushed();
|
onIncompleteFlushed();
|
||||||
else
|
else
|
||||||
fail(new PendingState(buffers, callback));
|
fail(pending);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,19 +18,11 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.io;
|
package org.eclipse.jetty.io;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.WritePendingException;
|
import java.nio.channels.WritePendingException;
|
||||||
import java.security.SecureRandom;
|
import java.security.SecureRandom;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -54,16 +46,23 @@ import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class WriteFlusherTest
|
public class WriteFlusherTest
|
||||||
{
|
{
|
||||||
@Mock
|
|
||||||
private EndPoint _endPointMock;
|
|
||||||
|
|
||||||
private WriteFlusher _flusher;
|
|
||||||
|
|
||||||
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
|
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
|
||||||
private final ExecutorService executor = Executors.newFixedThreadPool(16);
|
private final ExecutorService executor = Executors.newFixedThreadPool(16);
|
||||||
|
@Mock
|
||||||
|
private EndPoint _endPointMock;
|
||||||
|
private WriteFlusher _flusher;
|
||||||
private ByteArrayEndPoint _endp;
|
private ByteArrayEndPoint _endp;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -400,6 +399,34 @@ public class WriteFlusherTest
|
||||||
assertThat("callback completed", callback.isDone(), is(true));
|
assertThat("callback completed", callback.isDone(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPendingWriteDoesNotStoreConsumedBuffers() throws Exception
|
||||||
|
{
|
||||||
|
int toWrite = _endp.getOutput().capacity();
|
||||||
|
byte[] chunk1 = new byte[toWrite / 2];
|
||||||
|
Arrays.fill(chunk1, (byte)1);
|
||||||
|
ByteBuffer buffer1 = ByteBuffer.wrap(chunk1);
|
||||||
|
byte[] chunk2 = new byte[toWrite];
|
||||||
|
Arrays.fill(chunk1, (byte)2);
|
||||||
|
ByteBuffer buffer2 = ByteBuffer.wrap(chunk2);
|
||||||
|
|
||||||
|
_flusher.write(new Callback.Adapter(), buffer1, buffer2);
|
||||||
|
assertTrue(_flushIncomplete.get());
|
||||||
|
assertFalse(buffer1.hasRemaining());
|
||||||
|
|
||||||
|
// Reuse buffer1
|
||||||
|
buffer1.clear();
|
||||||
|
Arrays.fill(chunk1, (byte)3);
|
||||||
|
int remaining1 = buffer1.remaining();
|
||||||
|
|
||||||
|
// Complete the write
|
||||||
|
_endp.takeOutput();
|
||||||
|
_flusher.completeWrite();
|
||||||
|
|
||||||
|
// Make sure buffer1 is unchanged
|
||||||
|
assertEquals(remaining1, buffer1.remaining());
|
||||||
|
}
|
||||||
|
|
||||||
private class ExposingStateCallback extends FutureCallback
|
private class ExposingStateCallback extends FutureCallback
|
||||||
{
|
{
|
||||||
private boolean failed = false;
|
private boolean failed = false;
|
||||||
|
|
Loading…
Reference in New Issue