Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2017-01-18 17:28:36 +01:00
commit a49841f308
6 changed files with 377 additions and 647 deletions

View File

@ -241,6 +241,8 @@ The ALPN implementation, relying on modifications of OpenJDK classes, updates ev
|1.8.0u92 |8.1.8.v20160420
|1.8.0u101 |8.1.9.v20160720
|1.8.0u102 |8.1.9.v20160720
|1.8.0u111 |8.1.9.v20160720
|1.8.0u112 |8.1.10.v20161026
|=============================
[[alpn-build]]

View File

@ -22,11 +22,6 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -19,125 +19,92 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class WriteFlusherTest
{
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
private final ExecutorService executor = Executors.newFixedThreadPool(16);
@Mock
private EndPoint _endPointMock;
private WriteFlusher _flusher;
private ByteArrayEndPoint _endp;
@Before
public void before()
@Test
public void testCompleteNoBlocking() throws Exception
{
_endp = new ByteArrayEndPoint(new byte[]{}, 10);
_flushIncomplete.set(false);
_flusher = new WriteFlusher(_endp)
{
@Override
protected void onIncompleteFlush()
{
_flushIncomplete.set(true);
}
};
testCompleteWrite(false);
}
@Test
public void testIgnorePreviousFailures() throws Exception
{
_endp.setGrowOutput(true);
testCompleteWrite(true);
}
private void testCompleteWrite(boolean failBefore) throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
endPoint.setGrowOutput(true);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
if (failBefore)
flusher.onFail(new IOException("Ignored because no operation in progress"));
FutureCallback callback = new FutureCallback();
_flusher.onFail(new IOException("Ignored because no operation in progress"));
_flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
assertThat("context and callback.get() are equal",callback.get() , equalTo(null));
assertThat("string in endpoint matches expected string", "How now brown cow!",
equalTo(_endp.takeOutputString()));
assertTrue(_flusher.isIdle());
}
flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
@Test
public void testCompleteNoBlocking() throws Exception
{
_endp.setGrowOutput(true);
FutureCallback callback = new FutureCallback();
_flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
assertThat("context and callback.get() are equal", callback.get(), equalTo(null));
assertThat("string in endpoint matches expected string", "How now brown cow!",
equalTo(_endp.takeOutputString()));
assertTrue(_flusher.isIdle());
}
private void assertFlushIsComplete()
{
assertThat("flush is complete", _flushIncomplete.get(), is(false));
}
private void assertCallbackIsDone(FutureCallback callback)
{
assertThat("callback is done", callback.isDone(), is(true));
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
Assert.assertEquals("How now brown cow!", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testClosedNoBlocking() throws Exception
{
_endp.close();
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
endPoint.close();
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
_flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
flusher.write(callback, BufferUtil.toBuffer("foo"));
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
try
{
assertEquals(callback.get(),null);
callback.get();
Assert.fail();
}
catch (ExecutionException e)
@ -146,67 +113,88 @@ public class WriteFlusherTest
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
}
assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle());
Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testCompleteBlocking() throws Exception
{
FutureCallback callback = new FutureCallback();
_flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
Assert.assertFalse(callback.isDone());
Assert.assertFalse(callback.isCancelled());
Assert.assertTrue(incompleteFlush.get());
assertTrue(_flushIncomplete.get());
try
{
assertEquals(callback.get(10, TimeUnit.MILLISECONDS),null);
callback.get(100, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch (TimeoutException to)
catch (TimeoutException x)
{
_flushIncomplete.set(false);
incompleteFlush.set(false);
}
assertEquals("How now br", _endp.takeOutputString());
_flusher.completeWrite();
assertCallbackIsDone(callback);
assertEquals(callback.get(),null);
assertEquals("own cow!", _endp.takeOutputString());
assertFlushIsComplete();
assertTrue(_flusher.isIdle());
Assert.assertEquals("How now br", endPoint.takeOutputString());
flusher.completeWrite();
Assert.assertTrue(callback.isDone());
Assert.assertEquals("own cow!", endPoint.takeOutputString());
Assert.assertFalse(incompleteFlush.get());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testCloseWhileBlocking() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
_flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
Assert.assertFalse(callback.isDone());
Assert.assertFalse(callback.isCancelled());
Assert.assertTrue(incompleteFlush.get());
incompleteFlush.set(false);
Assert.assertEquals("How now br", endPoint.takeOutputString());
endPoint.close();
flusher.completeWrite();
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
assertTrue(_flushIncomplete.get());
try
{
assertEquals(callback.get(10, TimeUnit.MILLISECONDS),null);
Assert.fail();
}
catch (TimeoutException to)
{
_flushIncomplete.set(false);
}
assertEquals("How now br", _endp.takeOutputString());
_endp.close();
_flusher.completeWrite();
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
assertEquals(callback.get(),null);
callback.get();
Assert.fail();
}
catch (ExecutionException e)
@ -215,199 +203,112 @@ public class WriteFlusherTest
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
}
assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle());
Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testFailWhileBlocking() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
_flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
Assert.assertFalse(callback.isDone());
Assert.assertFalse(callback.isCancelled());
Assert.assertTrue(incompleteFlush.get());
incompleteFlush.set(false);
Assert.assertEquals("How now br", endPoint.takeOutputString());
String reason = "Failure";
flusher.onFail(new IOException(reason));
flusher.completeWrite();
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
assertTrue(_flushIncomplete.get());
try
{
assertEquals(callback.get(10, TimeUnit.MILLISECONDS),null);
Assert.fail();
}
catch (TimeoutException to)
{
_flushIncomplete.set(false);
}
assertEquals("How now br", _endp.takeOutputString());
_flusher.onFail(new IOException("Failure"));
_flusher.completeWrite();
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
assertEquals(callback.get(),null);
callback.get();
Assert.fail();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(), Matchers.containsString("Failure"));
}
assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle());
}
private static class ConcurrentFlusher extends WriteFlusher implements Runnable
{
final ByteArrayEndPoint _endp;
final SecureRandom _random;
final ScheduledThreadPoolExecutor _scheduler;
final StringBuilder _content = new StringBuilder();
ConcurrentFlusher(ByteArrayEndPoint endp, SecureRandom random, ScheduledThreadPoolExecutor scheduler)
{
super(endp);
_endp = endp;
_random = random;
_scheduler = scheduler;
}
@Override
protected void onIncompleteFlush()
{
_scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS);
}
@Override
public synchronized void run()
{
_content.append(_endp.takeOutputString());
completeWrite();
}
@Override
public synchronized String toString()
{
_content.append(_endp.takeOutputString());
return _content.toString();
Assert.assertEquals(reason, cause.getMessage());
}
Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testConcurrent() throws Exception
{
final SecureRandom random = new SecureRandom();
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000];
Random random = new Random();
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
try
{
String reason = "THE_CAUSE";
ConcurrentWriteFlusher[] flushers = new ConcurrentWriteFlusher[50000];
FutureCallback[] futures = new FutureCallback[flushers.length];
for (int i = 0; i < flushers.length; i++)
for (int i = 0; i < flushers.length; ++i)
{
int size = 5 + random.nextInt(15);
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{}, size);
final ConcurrentFlusher flusher = new ConcurrentFlusher(endp, random, scheduler);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], size);
ConcurrentWriteFlusher flusher = new ConcurrentWriteFlusher(endPoint, scheduler, random);
flushers[i] = flusher;
final FutureCallback callback = new FutureCallback();
FutureCallback callback = new FutureCallback();
futures[i] = callback;
scheduler.schedule(new Runnable()
{
@Override
public void run()
{
flusher.onFail(new Throwable("THE CAUSE"));
}
}
, random.nextInt(75) + 1, TimeUnit.MILLISECONDS);
scheduler.schedule(() -> flusher.onFail(new Throwable(reason)), random.nextInt(75) + 1, TimeUnit.MILLISECONDS);
flusher.write(callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
}
int completed = 0;
int failed = 0;
for (int i = 0; i < flushers.length; i++)
for (int i = 0; i < flushers.length; ++i)
{
try
{
futures[i].get();
assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!", flushers[i].toString());
futures[i].get(15, TimeUnit.SECONDS);
Assert.assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!", flushers[i].getContent());
completed++;
}
catch (Exception e)
catch (ExecutionException x)
{
assertThat(e.getMessage(), Matchers.containsString("THE CAUSE"));
Assert.assertEquals(reason, x.getCause().getMessage());
failed++;
}
}
assertThat(completed, Matchers.greaterThan(0));
assertThat(failed, Matchers.greaterThan(0));
Assert.assertThat(completed, Matchers.greaterThan(0));
Assert.assertThat(failed, Matchers.greaterThan(0));
Assert.assertEquals(flushers.length, completed + failed);
}
finally
{
scheduler.shutdown();
}
@Test
public void testConcurrentAccessToWriteAndOnFail() throws Exception
{
// TODO review this test - It was changed for the boolean flush return, but not really well inspected
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCompleteLatch = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@Override
public void write(Callback callback, ByteBuffer... buffers)
{
super.write(callback, buffers);
writeCompleteLatch.countDown();
}
@Override
protected void onIncompleteFlush()
{
}
};
endPointFlushExpectation(writeCalledLatch, failedCalledLatch);
ExposingStateCallback callback = new ExposingStateCallback();
executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)).get();
// callback failed is NOT called because in WRITING state failed() doesn't know about the callback. However
// either the write succeeds or we get an IOException which will call callback.failed()
assertThat("write complete", writeCompleteLatch.await(5, TimeUnit.SECONDS), is(true));
// in this testcase we more or less emulate that the write has successfully finished and we return from
// EndPoint.flush() back to WriteFlusher.write(). Then someone calls failed. So the callback should have been
// completed.
try
{
callback.get(5,TimeUnit.SECONDS);
assertThat("callback completed", callback.isCompleted(), is(true));
assertThat("callback failed", callback.isFailed(), is(false));
}
catch(ExecutionException e)
{
// ignored because failure is expected
assertThat("callback failed", callback.isFailed(), is(true));
}
assertThat("callback completed", callback.isDone(), is(true));
}
@Test
public void testPendingWriteDoesNotStoreConsumedBuffers() throws Exception
{
int toWrite = _endp.getOutput().capacity();
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
int toWrite = endPoint.getOutput().capacity();
byte[] chunk1 = new byte[toWrite / 2];
Arrays.fill(chunk1, (byte)1);
ByteBuffer buffer1 = ByteBuffer.wrap(chunk1);
@ -415,9 +316,20 @@ public class WriteFlusherTest
Arrays.fill(chunk1, (byte)2);
ByteBuffer buffer2 = ByteBuffer.wrap(chunk2);
_flusher.write(Callback.NOOP, buffer1, buffer2);
assertTrue(_flushIncomplete.get());
assertFalse(buffer1.hasRemaining());
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
flusher.write(Callback.NOOP, buffer1, buffer2);
Assert.assertTrue(incompleteFlush.get());
Assert.assertFalse(buffer1.hasRemaining());
// Reuse buffer1
buffer1.clear();
@ -425,314 +337,143 @@ public class WriteFlusherTest
int remaining1 = buffer1.remaining();
// Complete the write
_endp.takeOutput();
_flusher.completeWrite();
endPoint.takeOutput();
flusher.completeWrite();
// Make sure buffer1 is unchanged
assertEquals(remaining1, buffer1.remaining());
}
private class ExposingStateCallback extends FutureCallback
{
private boolean failed = false;
private boolean completed = false;
@Override
public void succeeded()
{
completed = true;
super.succeeded();
}
@Override
public void failed(Throwable cause)
{
failed = true;
super.failed(cause);
}
public boolean isFailed()
{
return failed;
}
public boolean isCompleted()
{
return completed;
}
Assert.assertEquals(remaining1, buffer1.remaining());
}
@Test(expected = WritePendingException.class)
public void testConcurrentAccessToWrite() throws Throwable
public void testConcurrentWrites() throws Exception
{
final CountDownLatch flushCalledLatch = new CountDownLatch(1);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
CountDownLatch flushLatch = new CountDownLatch(1);
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
try
{
flushLatch.countDown();
Thread.sleep(2000);
return super.flush(buffers);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
@Override
protected void onIncompleteFlush()
{
}
};
// in this test we just want to make sure that we called write twice at the same time
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
flushCalledLatch.countDown();
// make sure we stay here, so write is called twice at the same time
Thread.sleep(5000);
return Boolean.TRUE;
}
});
executor.submit(new Writer(writeFlusher, new FutureCallback()));
// make sure that we call .get() on the write that executed second by waiting on this latch
assertThat("Flush has been called once", flushCalledLatch.await(5, TimeUnit.SECONDS), is(true));
try
{
executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
}
catch (ExecutionException e)
{
throw e.getCause();
}
}
private void endPointFlushExpectation(final CountDownLatch writeCalledLatch,
final CountDownLatch failedCalledLatch) throws IOException
{
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
Object[] arguments = invocation.getArguments();
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
BufferUtil.flipToFill(byteBuffer); // pretend everything has been written
writeCalledLatch.countDown();
failedCalledLatch.await(5, TimeUnit.SECONDS);
return Boolean.TRUE;
}
});
// Two concurrent writes.
new Thread(() -> flusher.write(Callback.NOOP, BufferUtil.toBuffer("foo"))).start();
Assert.assertTrue(flushLatch.await(1, TimeUnit.SECONDS));
// The second write throws WritePendingException.
flusher.write(Callback.NOOP, BufferUtil.toBuffer("bar"));
}
@Test
public void testConcurrentAccessToIncompleteWriteAndOnFail() throws Exception
public void testConcurrentWriteAndOnFail() throws Exception
{
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch))
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
ByteBuffer[] result = super.flush(buffers);
boolean notified = onFail(new Throwable());
Assert.assertFalse(notified);
return result;
}
@Override
protected void onIncompleteFlush()
{
onIncompleteFlushedCalledLatch.countDown();
try
{
failedCalledLatch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
completeWrite();
completeWrite.countDown();
}
};
ExposingStateCallback callback = new ExposingStateCallback();
executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
// make sure we're in pending state when calling onFail
assertThat("onIncompleteFlushed has been called.", onIncompleteFlushedCalledLatch.await(5,
TimeUnit.SECONDS), is(true));
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch));
assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
// when we fail in PENDING state, we should have called callback.failed()
assertThat("callback failed has been called", callback.isFailed(), is(true));
assertThat("callback complete has not been called", callback.isCompleted(), is(false));
}
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("foo"));
private static class EndPointConcurrentAccessToIncompleteWriteAndOnFailMock extends ByteArrayEndPoint
{
private final CountDownLatch writeCalledLatch;
private final CountDownLatch failedCalledLatch;
private final AtomicBoolean stalled=new AtomicBoolean(false);
public EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
{
this.writeCalledLatch = writeCalledLatch;
this.failedCalledLatch = failedCalledLatch;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
writeCalledLatch.countDown();
ByteBuffer byteBuffer = buffers[0];
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{
// make sure we stall at least once
if (!stalled.get())
{
stalled.set(true);
return false;
}
// make sure failed is called before we go on
try
{
failedCalledLatch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
BufferUtil.flipToFill(byteBuffer);
}
else if (byteBuffer.remaining() == 3)
{
byteBuffer.position(1); // pretend writing one byte
}
else
{
byteBuffer.position(byteBuffer.limit());
}
for (ByteBuffer b: buffers)
if (BufferUtil.hasContent(b))
return false;
return true;
}
// Callback must be successfully completed.
callback.get(1, TimeUnit.SECONDS);
// Flusher must be idle - not failed - since the write succeeded.
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testIterationOnNonBlockedStall() throws Exception
public void testConcurrentIncompleteFlushAndOnFail() throws Exception
{
final Exchanger<Integer> exchange = new Exchanger<>();
final AtomicInteger window = new AtomicInteger(10);
EndPointIterationOnNonBlockedStallMock endp=new EndPointIterationOnNonBlockedStallMock(window);
final WriteFlusher writeFlusher = new WriteFlusher(endp)
int capacity = 8;
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], capacity);
String reason = "the_reason";
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
executor.submit(new Runnable()
onFail(new Throwable(reason));
}
};
FutureCallback callback = new FutureCallback();
byte[] content = new byte[capacity * 2];
flusher.write(callback, BufferUtil.toBuffer(content));
try
{
// Callback must be failed.
callback.get(1, TimeUnit.SECONDS);
}
catch (ExecutionException x)
{
Assert.assertEquals(reason, x.getCause().getMessage());
}
}
private static class ConcurrentWriteFlusher extends WriteFlusher implements Runnable
{
private final ByteArrayEndPoint endPoint;
private final ScheduledExecutorService scheduler;
private final Random random;
private String content = "";
private ConcurrentWriteFlusher(ByteArrayEndPoint endPoint, ScheduledThreadPoolExecutor scheduler, Random random)
{
super(endPoint);
this.endPoint = endPoint;
this.scheduler = scheduler;
this.random = random;
}
@Override
protected void onIncompleteFlush()
{
scheduler.schedule(this, 1 + random.nextInt(9), TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
try
{
while(window.get()==0)
window.addAndGet(exchange.exchange(0));
content += endPoint.takeOutputString();
completeWrite();
}
catch(Throwable th)
private String getContent()
{
th.printStackTrace();
}
}
});
}
};
try(Blocker blocker = new SharedBlockingCallback().acquire())
{
writeFlusher.write(blocker,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow."));
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br"));
exchange.exchange(1);
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o"));
exchange.exchange(8);
blocker.block();
}
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("wn cow."));
}
private static class EndPointIterationOnNonBlockedStallMock extends ByteArrayEndPoint
{
final AtomicInteger _window;
public EndPointIterationOnNonBlockedStallMock(AtomicInteger window)
{
_window=window;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
ByteBuffer byteBuffer = buffers[0];
if (_window.get()>0 && byteBuffer.hasRemaining())
{
// consume 1 byte
byte one = byteBuffer.get(byteBuffer.position());
if (super.flush(ByteBuffer.wrap(new byte[]{one})))
{
_window.decrementAndGet();
byteBuffer.position(byteBuffer.position()+1);
}
}
for (ByteBuffer b: buffers)
if (BufferUtil.hasContent(b))
return false;
return true;
}
}
private static class FailedCaller implements Callable<FutureCallback>
{
private final WriteFlusher writeFlusher;
private CountDownLatch failedCalledLatch;
public FailedCaller(WriteFlusher writeFlusher, CountDownLatch failedCalledLatch)
{
this.writeFlusher = writeFlusher;
this.failedCalledLatch = failedCalledLatch;
}
@Override
public FutureCallback call()
{
writeFlusher.onFail(new IllegalStateException());
failedCalledLatch.countDown();
return null;
}
}
private class Writer implements Callable<FutureCallback>
{
private final WriteFlusher writeFlusher;
private FutureCallback callback;
public Writer(WriteFlusher writeFlusher, FutureCallback callback)
{
this.writeFlusher = writeFlusher;
this.callback = callback;
}
@Override
public FutureCallback call()
{
writeFlusher.write(callback, BufferUtil.toBuffer("foo"));
return callback;
content += endPoint.takeOutputString();
return content;
}
}
}

View File

@ -74,11 +74,5 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -18,110 +18,119 @@
package org.eclipse.jetty.server.handler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.net.Inet4Address;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
public class ShutdownHandlerTest
{
@Mock private Request baseRequest;
@Mock private HttpServletRequest request;
@Mock private HttpServletResponse response;
private Server server = new Server(0);
private Server server;
private ServerConnector connector;
private String shutdownToken = "asdlnsldgnklns";
// class under test
private ShutdownHandler shutdownHandler;
@Before
public void startServer() throws Exception
public void start(HandlerWrapper wrapper) throws Exception
{
MockitoAnnotations.initMocks(this);
shutdownHandler = new ShutdownHandler(shutdownToken);
server.setHandler(shutdownHandler);
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
Handler shutdown = new ShutdownHandler(shutdownToken);
Handler handler = shutdown;
if (wrapper != null)
{
wrapper.setHandler(shutdown);
handler = wrapper;
}
server.setHandler(handler);
server.start();
}
@Test
public void shutdownServerWithCorrectTokenAndIPTest() throws Exception
{
setDefaultExpectations();
final CountDownLatch countDown = new CountDownLatch(1);
server.addLifeCycleListener(new AbstractLifeCycle.Listener ()
public void testShutdownServerWithCorrectTokenAndIP() throws Exception
{
start(null);
public void lifeCycleStarting(LifeCycle event)
CountDownLatch stopLatch = new CountDownLatch(1);
server.addLifeCycleListener(new AbstractLifeCycle.AbstractLifeCycleListener()
{
}
public void lifeCycleStarted(LifeCycle event)
{
}
public void lifeCycleFailure(LifeCycle event, Throwable cause)
{
}
public void lifeCycleStopping(LifeCycle event)
{
}
@Override
public void lifeCycleStopped(LifeCycle event)
{
countDown.countDown();
stopLatch.countDown();
}
});
when(baseRequest.getRemoteInetSocketAddress()).thenReturn(new InetSocketAddress(Inet4Address.getLoopbackAddress(),45454));
shutdownHandler.handle("/shutdown",baseRequest,request,response);
boolean stopped = countDown.await(1000, TimeUnit.MILLISECONDS); //wait up to 1 sec to stop
assertTrue("Server lifecycle stop listener called", stopped);
assertEquals("Server should be stopped","STOPPED",server.getState());
HttpTester.Response response = shutdown(shutdownToken);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(stopLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(AbstractLifeCycle.STOPPED, server.getState());
}
@Test
public void wrongTokenTest() throws Exception
public void testWrongToken() throws Exception
{
setDefaultExpectations();
when(request.getParameter("token")).thenReturn("anothertoken");
when(baseRequest.getRemoteInetSocketAddress()).thenReturn(new InetSocketAddress(Inet4Address.getLoopbackAddress(),45454));
shutdownHandler.handle("/shutdown",baseRequest,request,response);
assertEquals("Server should be running","STARTED",server.getState());
start(null);
HttpTester.Response response = shutdown("wrongToken");
Assert.assertEquals(HttpStatus.UNAUTHORIZED_401, response.getStatus());
Thread.sleep(1000);
Assert.assertEquals(AbstractLifeCycle.STARTED, server.getState());
}
@Test
public void shutdownRequestNotFromLocalhostTest() throws Exception
public void testShutdownRequestNotFromLocalhost() throws Exception
{
setDefaultExpectations();
when(request.getRemoteAddr()).thenReturn("192.168.3.3");
when(baseRequest.getRemoteInetSocketAddress()).thenReturn(new InetSocketAddress(Inet4Address.getByName("192.168.3.3"),45454));
shutdownHandler.handle("/shutdown",baseRequest,request,response);
assertEquals("Server should be running","STARTED",server.getState());
start(new HandlerWrapper()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setRemoteAddr(new InetSocketAddress("192.168.0.1", 12345));
super.handle(target, baseRequest, request, response);
}
});
HttpTester.Response response = shutdown(shutdownToken);
Assert.assertEquals(HttpStatus.UNAUTHORIZED_401, response.getStatus());
Thread.sleep(1000);
Assert.assertEquals(AbstractLifeCycle.STARTED, server.getState());
}
private void setDefaultExpectations()
private HttpTester.Response shutdown(String shutdownToken) throws IOException
{
when(request.getMethod()).thenReturn("POST");
when(request.getParameter("token")).thenReturn(shutdownToken);
when(request.getRemoteAddr()).thenReturn("127.0.0.1");
}
try (Socket socket = new Socket("localhost", connector.getLocalPort()))
{
String request = "" +
"POST /shutdown?token=" + shutdownToken + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
HttpTester.Input input = HttpTester.from(socket.getInputStream());
return HttpTester.parseResponse(input);
}
}
}

13
pom.xml
View File

@ -20,7 +20,7 @@
<slf4j-version>1.6.6</slf4j-version>
<jetty-test-policy-version>1.2</jetty-test-policy-version>
<alpn.api.version>1.1.3.v20160715</alpn.api.version>
<jsp.version>8.5.9</jsp.version>
<jsp.version>8.5.5</jsp.version>
<!-- default values are unsupported, but required to be defined for reactor sanity reasons -->
<alpn.version>undefined</alpn.version>
</properties>
@ -964,17 +964,6 @@
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>