470727 - Thread Starvation of selector wakeups.
Changed the CallBack.NonBlocking to a default Callback.isNonBlocking, so that wrapping callbacks can determine if they are NonBlocking or not.
This commit is contained in:
parent
24c31527cb
commit
607239028c
|
@ -672,6 +672,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
|
|
||||||
private class CommitCallback implements Callback
|
private class CommitCallback implements Callback
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return content.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
@ -882,6 +889,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||||
|
|
||||||
private class LastContentCallback implements Callback
|
private class LastContentCallback implements Callback
|
||||||
{
|
{
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return content.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -230,6 +230,12 @@ public class HttpSenderOverHTTP extends HttpSender
|
||||||
this.buffers = buffers;
|
this.buffers = buffers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return callback.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -87,7 +87,7 @@ import org.eclipse.jetty.util.Callback;
|
||||||
*/
|
*/
|
||||||
public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
|
public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
|
||||||
{
|
{
|
||||||
private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
|
private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
|
||||||
|
|
||||||
private final Object lock = this;
|
private final Object lock = this;
|
||||||
private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
|
private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
|
||||||
|
@ -143,7 +143,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
|
||||||
*/
|
*/
|
||||||
public boolean offer(ByteBuffer buffer)
|
public boolean offer(ByteBuffer buffer)
|
||||||
{
|
{
|
||||||
return offer(buffer, Callback.Adapter.INSTANCE);
|
return offer(buffer, Callback.NOOP);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean offer(ByteBuffer buffer, Callback callback)
|
public boolean offer(ByteBuffer buffer, Callback callback)
|
||||||
|
|
|
@ -78,6 +78,12 @@ public class OutputStreamContentProvider implements AsyncContentProvider, Callba
|
||||||
private final DeferredContentProvider deferred = new DeferredContentProvider();
|
private final DeferredContentProvider deferred = new DeferredContentProvider();
|
||||||
private final OutputStream output = new DeferredOutputStream();
|
private final OutputStream output = new DeferredOutputStream();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return deferred.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getLength()
|
public long getLength()
|
||||||
{
|
{
|
||||||
|
|
|
@ -157,7 +157,7 @@ public class HttpClientFailureTest
|
||||||
|
|
||||||
Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
|
||||||
final CountDownLatch contentLatch = new CountDownLatch(1);
|
final CountDownLatch contentLatch = new CountDownLatch(1);
|
||||||
content.offer(ByteBuffer.allocate(1024), new Callback.Adapter()
|
content.offer(ByteBuffer.allocate(1024), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
|
|
@ -691,7 +691,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
||||||
try (DeferredContentProvider content = new DeferredContentProvider())
|
try (DeferredContentProvider content = new DeferredContentProvider())
|
||||||
{
|
{
|
||||||
// Make the content immediately available.
|
// Make the content immediately available.
|
||||||
content.offer(ByteBuffer.allocate(1024), new Callback.Adapter()
|
content.offer(ByteBuffer.allocate(1024), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -976,7 +976,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
||||||
start(new EmptyServerHandler());
|
start(new EmptyServerHandler());
|
||||||
|
|
||||||
final CountDownLatch failLatch = new CountDownLatch(2);
|
final CountDownLatch failLatch = new CountDownLatch(2);
|
||||||
final Callback.Adapter callback = new Callback.Adapter()
|
final Callback callback = new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
@ -1014,7 +1014,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
|
||||||
|
|
||||||
// Make sure that adding more content results in the callback to be failed.
|
// Make sure that adding more content results in the callback to be failed.
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
content.offer(ByteBuffer.wrap(new byte[128]), new Callback.Adapter()
|
content.offer(ByteBuffer.wrap(new byte[128]), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class HttpSenderOverFCGI extends HttpSender
|
||||||
int id = getHttpChannel().getRequest();
|
int id = getHttpChannel().getRequest();
|
||||||
boolean hasContent = content.hasContent();
|
boolean hasContent = content.hasContent();
|
||||||
Generator.Result headersResult = generator.generateRequestHeaders(id, fcgiHeaders,
|
Generator.Result headersResult = generator.generateRequestHeaders(id, fcgiHeaders,
|
||||||
hasContent ? callback : Callback.Adapter.INSTANCE);
|
hasContent ? callback : Callback.NOOP);
|
||||||
if (hasContent)
|
if (hasContent)
|
||||||
{
|
{
|
||||||
getHttpChannel().flush(headersResult);
|
getHttpChannel().flush(headersResult);
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class HttpTransportOverFCGI implements HttpTransport
|
||||||
{
|
{
|
||||||
if (lastContent)
|
if (lastContent)
|
||||||
{
|
{
|
||||||
Generator.Result headersResult = generateResponseHeaders(info, Callback.Adapter.INSTANCE);
|
Generator.Result headersResult = generateResponseHeaders(info, Callback.NOOP);
|
||||||
Generator.Result contentResult = generateResponseContent(BufferUtil.EMPTY_BUFFER, true, callback);
|
Generator.Result contentResult = generateResponseContent(BufferUtil.EMPTY_BUFFER, true, callback);
|
||||||
flusher.flush(headersResult, contentResult);
|
flusher.flush(headersResult, contentResult);
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ public class HttpTransportOverFCGI implements HttpTransport
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Generator.Result headersResult = generateResponseHeaders(info, Callback.Adapter.INSTANCE);
|
Generator.Result headersResult = generateResponseHeaders(info, Callback.NOOP);
|
||||||
Generator.Result contentResult = generateResponseContent(content, lastContent, callback);
|
Generator.Result contentResult = generateResponseContent(content, lastContent, callback);
|
||||||
flusher.flush(headersResult, contentResult);
|
flusher.flush(headersResult, contentResult);
|
||||||
}
|
}
|
||||||
|
|
|
@ -299,7 +299,7 @@ public class HTTP2Client extends ContainerLifeCycle
|
||||||
private void closeConnections()
|
private void closeConnections()
|
||||||
{
|
{
|
||||||
for (ISession session : sessions)
|
for (ISession session : sessions)
|
||||||
session.close(ErrorCode.NO_ERROR.code, null, Callback.Adapter.INSTANCE);
|
session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP);
|
||||||
sessions.clear();
|
sessions.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class HTTP2ClientSession extends HTTP2Session
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
stream.process(frame, Callback.Adapter.INSTANCE);
|
stream.process(frame, Callback.NOOP);
|
||||||
notifyHeaders(stream, frame);
|
notifyHeaders(stream, frame);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,7 @@ public class HTTP2ClientSession extends HTTP2Session
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IStream pushStream = createRemoteStream(pushStreamId);
|
IStream pushStream = createRemoteStream(pushStreamId);
|
||||||
pushStream.process(frame, Callback.Adapter.INSTANCE);
|
pushStream.process(frame, Callback.NOOP);
|
||||||
Stream.Listener listener = notifyPush(stream, pushStream, frame);
|
Stream.Listener listener = notifyPush(stream, pushStream, frame);
|
||||||
pushStream.setListener(listener);
|
pushStream.setListener(listener);
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class AsyncIOTest extends AbstractTest
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
@ -148,7 +148,7 @@ public class AsyncIOTest extends AbstractTest
|
||||||
|
|
||||||
// Wait until service() returns.
|
// Wait until service() returns.
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
@ -204,11 +204,11 @@ public class AsyncIOTest extends AbstractTest
|
||||||
|
|
||||||
// Wait until service() returns.
|
// Wait until service() returns.
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
|
||||||
|
|
||||||
// Wait until onDataAvailable() returns.
|
// Wait until onDataAvailable() returns.
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
// Make sure onDataAvailable() has been called twice
|
// Make sure onDataAvailable() has been called twice
|
||||||
|
|
|
@ -247,7 +247,7 @@ public abstract class FlowControlStrategyTest
|
||||||
HttpFields fields = new HttpFields();
|
HttpFields fields = new HttpFields();
|
||||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
|
|
||||||
return new Stream.Listener.Adapter()
|
return new Stream.Listener.Adapter()
|
||||||
{
|
{
|
||||||
|
@ -263,7 +263,7 @@ public abstract class FlowControlStrategyTest
|
||||||
callbackRef.set(callback);
|
callbackRef.set(callback);
|
||||||
Map<Integer, Integer> settings = new HashMap<>();
|
Map<Integer, Integer> settings = new HashMap<>();
|
||||||
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, size);
|
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, size);
|
||||||
stream.getSession().settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
|
stream.getSession().settings(new SettingsFrame(settings, false), Callback.NOOP);
|
||||||
// Do not succeed the callback here.
|
// Do not succeed the callback here.
|
||||||
}
|
}
|
||||||
else if (dataFrameCount > 1)
|
else if (dataFrameCount > 1)
|
||||||
|
@ -293,11 +293,11 @@ public abstract class FlowControlStrategyTest
|
||||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// Send first chunk that exceeds the window.
|
// Send first chunk that exceeds the window.
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.NOOP);
|
||||||
settingsLatch.await(5, TimeUnit.SECONDS);
|
settingsLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
|
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.NOOP);
|
||||||
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
|
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// Consume the data arrived to server, this will resume flow control on the client.
|
// Consume the data arrived to server, this will resume flow control on the client.
|
||||||
|
@ -325,10 +325,10 @@ public abstract class FlowControlStrategyTest
|
||||||
{
|
{
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
|
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
|
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
|
||||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
stream.data(dataFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -337,7 +337,7 @@ public abstract class FlowControlStrategyTest
|
||||||
|
|
||||||
Map<Integer, Integer> settings = new HashMap<>();
|
Map<Integer, Integer> settings = new HashMap<>();
|
||||||
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize);
|
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize);
|
||||||
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
|
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
@ -417,7 +417,7 @@ public abstract class FlowControlStrategyTest
|
||||||
{
|
{
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return new Stream.Listener.Adapter()
|
return new Stream.Listener.Adapter()
|
||||||
{
|
{
|
||||||
private AtomicInteger dataFrames = new AtomicInteger();
|
private AtomicInteger dataFrames = new AtomicInteger();
|
||||||
|
@ -474,7 +474,7 @@ public abstract class FlowControlStrategyTest
|
||||||
|
|
||||||
final int length = 5 * windowSize;
|
final int length = 5 * windowSize;
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
|
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
|
||||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
stream.data(dataFrame, Callback.NOOP);
|
||||||
|
|
||||||
Callback callback = exchanger.exchange(null, 5, TimeUnit.SECONDS);
|
Callback callback = exchanger.exchange(null, 5, TimeUnit.SECONDS);
|
||||||
checkThatWeAreFlowControlStalled(exchanger);
|
checkThatWeAreFlowControlStalled(exchanger);
|
||||||
|
@ -519,7 +519,7 @@ public abstract class FlowControlStrategyTest
|
||||||
// Send data to consume most of the session window.
|
// Send data to consume most of the session window.
|
||||||
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - windowSize);
|
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - windowSize);
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), data, true);
|
DataFrame dataFrame = new DataFrame(stream.getId(), data, true);
|
||||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
stream.data(dataFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -527,9 +527,9 @@ public abstract class FlowControlStrategyTest
|
||||||
// For every stream, send down half the window size of data.
|
// For every stream, send down half the window size of data.
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(windowSize / 2), true);
|
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(windowSize / 2), true);
|
||||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
stream.data(dataFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -615,9 +615,9 @@ public abstract class FlowControlStrategyTest
|
||||||
{
|
{
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.wrap(data), true);
|
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.wrap(data), true);
|
||||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
stream.data(dataFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -675,8 +675,8 @@ public abstract class FlowControlStrategyTest
|
||||||
@Override
|
@Override
|
||||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
{
|
{
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk1), false), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk1), false), Callback.NOOP);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk2), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(chunk2), true), Callback.NOOP);
|
||||||
dataLatch.countDown();
|
dataLatch.countDown();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -685,7 +685,7 @@ public abstract class FlowControlStrategyTest
|
||||||
Session session = newClient(new Session.Listener.Adapter());
|
Session session = newClient(new Session.Listener.Adapter());
|
||||||
Map<Integer, Integer> settings = new HashMap<>();
|
Map<Integer, Integer> settings = new HashMap<>();
|
||||||
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 0);
|
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 0);
|
||||||
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
|
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
|
||||||
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
byte[] content = new byte[chunk1.length + chunk2.length];
|
byte[] content = new byte[chunk1.length + chunk2.length];
|
||||||
|
@ -712,7 +712,7 @@ public abstract class FlowControlStrategyTest
|
||||||
settingsLatch.set(new CountDownLatch(1));
|
settingsLatch.set(new CountDownLatch(1));
|
||||||
settings.clear();
|
settings.clear();
|
||||||
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, chunk1.length / 2);
|
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, chunk1.length / 2);
|
||||||
session.settings(new SettingsFrame(settings, false), Callback.Adapter.INSTANCE);
|
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
|
||||||
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(settingsLatch.get().await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
@ -734,7 +734,7 @@ public abstract class FlowControlStrategyTest
|
||||||
{
|
{
|
||||||
MetaData metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return new Stream.Listener.Adapter()
|
return new Stream.Listener.Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -787,7 +787,7 @@ public abstract class FlowControlStrategyTest
|
||||||
|
|
||||||
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
|
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
|
||||||
DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true);
|
DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true);
|
||||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
stream.data(dataFrame, Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
@ -820,7 +820,7 @@ public abstract class FlowControlStrategyTest
|
||||||
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
||||||
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||||
stream.data(new DataFrame(stream.getId(), data, false), new Callback.Adapter()
|
stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -845,7 +845,7 @@ public abstract class FlowControlStrategyTest
|
||||||
ByteBuffer extraData = ByteBuffer.allocate(1024);
|
ByteBuffer extraData = ByteBuffer.allocate(1024);
|
||||||
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
|
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
|
||||||
List<ByteBuffer> buffers = lease.getByteBuffers();
|
List<ByteBuffer> buffers = lease.getByteBuffers();
|
||||||
http2Session.getEndPoint().write(Callback.Adapter.INSTANCE, buffers.toArray(new ByteBuffer[buffers.size()]));
|
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()]));
|
||||||
|
|
||||||
// Expect the connection to be closed.
|
// Expect the connection to be closed.
|
||||||
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
@ -885,7 +885,7 @@ public abstract class FlowControlStrategyTest
|
||||||
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
||||||
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||||
stream.data(new DataFrame(stream.getId(), data, false), new Callback.Adapter()
|
stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -906,7 +906,7 @@ public abstract class FlowControlStrategyTest
|
||||||
ByteBuffer extraData = ByteBuffer.allocate(1024);
|
ByteBuffer extraData = ByteBuffer.allocate(1024);
|
||||||
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
|
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
|
||||||
List<ByteBuffer> buffers = lease.getByteBuffers();
|
List<ByteBuffer> buffers = lease.getByteBuffers();
|
||||||
http2Session.getEndPoint().write(Callback.Adapter.INSTANCE, buffers.toArray(new ByteBuffer[buffers.size()]));
|
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()]));
|
||||||
|
|
||||||
// Expect the connection to be closed.
|
// Expect the connection to be closed.
|
||||||
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
@ -936,7 +936,7 @@ public abstract class FlowControlStrategyTest
|
||||||
// stream is reset, and automatically consumed to
|
// stream is reset, and automatically consumed to
|
||||||
// keep the session window large for other streams.
|
// keep the session window large for other streams.
|
||||||
callback.failed(new Throwable());
|
callback.failed(new Throwable());
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -960,7 +960,7 @@ public abstract class FlowControlStrategyTest
|
||||||
// Perform a big upload that will stall the flow control windows.
|
// Perform a big upload that will stall the flow control windows.
|
||||||
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||||
stream.data(new DataFrame(stream.getId(), data, true), new Callback.Adapter()
|
stream.data(new DataFrame(stream.getId(), data, true), new Callback.NonBlocking()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
stream.setIdleTimeout(10 * idleTimeout);
|
stream.setIdleTimeout(10 * idleTimeout);
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -154,7 +154,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
sleep(idleTimeout + idleTimeout / 2);
|
sleep(idleTimeout + idleTimeout / 2);
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -207,7 +207,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
stream.setIdleTimeout(10 * idleTimeout);
|
stream.setIdleTimeout(10 * idleTimeout);
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,7 +285,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
stream.setIdleTimeout(10 * idleTimeout);
|
stream.setIdleTimeout(10 * idleTimeout);
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,7 +452,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
|
|
||||||
sleep(idleTimeout / 2);
|
sleep(idleTimeout / 2);
|
||||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback.Adapter()
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), new Callback()
|
||||||
{
|
{
|
||||||
private int sends;
|
private int sends;
|
||||||
|
|
||||||
|
@ -461,7 +461,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
{
|
{
|
||||||
sleep(idleTimeout / 2);
|
sleep(idleTimeout / 2);
|
||||||
final boolean last = ++sends == 2;
|
final boolean last = ++sends == 2;
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), !last ? this : new Adapter()
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), last), !last ? this : new Callback.NonBlocking()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -486,7 +486,7 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
{
|
{
|
||||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -513,11 +513,11 @@ public class IdleTimeoutTest extends AbstractTest
|
||||||
final Stream stream = promise.get(5, TimeUnit.SECONDS);
|
final Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
sleep(idleTimeout / 2);
|
sleep(idleTimeout / 2);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
|
||||||
sleep(idleTimeout / 2);
|
sleep(idleTimeout / 2);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
|
||||||
sleep(idleTimeout / 2);
|
sleep(idleTimeout / 2);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS));
|
Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class PingTest extends AbstractTest
|
||||||
});
|
});
|
||||||
|
|
||||||
PingFrame frame = new PingFrame(payload, false);
|
PingFrame frame = new PingFrame(payload, false);
|
||||||
session.ping(frame, Callback.Adapter.INSTANCE);
|
session.ping(frame, Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class PushCacheFilterTest extends AbstractTest
|
||||||
{
|
{
|
||||||
// Reset the stream as soon as we see the push.
|
// Reset the stream as soon as we see the push.
|
||||||
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code);
|
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code);
|
||||||
stream.reset(resetFrame, Callback.Adapter.INSTANCE);
|
stream.reset(resetFrame, Callback.NOOP);
|
||||||
return new Adapter()
|
return new Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class SessionFailureTest extends AbstractTest
|
||||||
// Forcibly close the connection.
|
// Forcibly close the connection.
|
||||||
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
||||||
// Now try to write something: it should fail.
|
// Now try to write something: it should fail.
|
||||||
stream.headers(frame, new Callback.Adapter()
|
stream.headers(frame, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
{
|
{
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, true);
|
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||||
stream.headers(response, new Callback.Adapter()
|
stream.headers(response, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -122,14 +122,14 @@ public class StreamCloseTest extends AbstractTest
|
||||||
{
|
{
|
||||||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, false);
|
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||||
stream.headers(response, Callback.Adapter.INSTANCE);
|
stream.headers(response, Callback.NOOP);
|
||||||
return new Stream.Listener.Adapter()
|
return new Stream.Listener.Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void onData(final Stream stream, DataFrame frame, final Callback callback)
|
public void onData(final Stream stream, DataFrame frame, final Callback callback)
|
||||||
{
|
{
|
||||||
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
|
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
|
||||||
stream.data(frame, new Callback.Adapter()
|
stream.data(frame, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -163,7 +163,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
Assert.assertFalse(((HTTP2Stream)stream).isLocallyClosed());
|
Assert.assertFalse(((HTTP2Stream)stream).isLocallyClosed());
|
||||||
|
|
||||||
final CountDownLatch clientDataLatch = new CountDownLatch(1);
|
final CountDownLatch clientDataLatch = new CountDownLatch(1);
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(new byte[512]), true), new Callback.Adapter()
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(new byte[512]), true), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -198,7 +198,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
// When created, pushed stream must be implicitly remotely closed.
|
// When created, pushed stream must be implicitly remotely closed.
|
||||||
Assert.assertTrue(((HTTP2Stream)pushedStream).isRemotelyClosed());
|
Assert.assertTrue(((HTTP2Stream)pushedStream).isRemotelyClosed());
|
||||||
// Send some data with endStream = true.
|
// Send some data with endStream = true.
|
||||||
pushedStream.data(new DataFrame(pushedStream.getId(), ByteBuffer.allocate(16), true), new Callback.Adapter()
|
pushedStream.data(new DataFrame(pushedStream.getId(), ByteBuffer.allocate(16), true), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -210,7 +210,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
}
|
}
|
||||||
}, new Stream.Listener.Adapter());
|
}, new Stream.Listener.Adapter());
|
||||||
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
|
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
|
||||||
stream.headers(response, Callback.Adapter.INSTANCE);
|
stream.headers(response, Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -259,7 +259,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
Assert.assertTrue(pushedStream.isReset());
|
Assert.assertTrue(pushedStream.isReset());
|
||||||
Assert.assertTrue(pushedStream.isClosed());
|
Assert.assertTrue(pushedStream.isClosed());
|
||||||
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
|
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
|
||||||
stream.headers(response, Callback.Adapter.INSTANCE);
|
stream.headers(response, Callback.NOOP);
|
||||||
serverLatch.countDown();
|
serverLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -275,7 +275,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public Stream.Listener onPush(final Stream pushedStream, PushPromiseFrame frame)
|
public Stream.Listener onPush(final Stream pushedStream, PushPromiseFrame frame)
|
||||||
{
|
{
|
||||||
pushedStream.reset(new ResetFrame(pushedStream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), new Callback.Adapter()
|
pushedStream.reset(new ResetFrame(pushedStream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -315,7 +315,7 @@ public class StreamCloseTest extends AbstractTest
|
||||||
{
|
{
|
||||||
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
||||||
// Try to write something to force an error.
|
// Try to write something to force an error.
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
client.newStream(requestFrame, promise, new Stream.Listener.Adapter());
|
client.newStream(requestFrame, promise, new Stream.Listener.Adapter());
|
||||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
|
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
|
||||||
stream.reset(resetFrame, Callback.Adapter.INSTANCE);
|
stream.reset(resetFrame, Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
@ -126,14 +126,14 @@ public class StreamResetTest extends AbstractTest
|
||||||
{
|
{
|
||||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
|
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
|
||||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
stream.headers(responseFrame, Callback.NOOP);
|
||||||
return new Stream.Listener.Adapter()
|
return new Stream.Listener.Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP);
|
||||||
serverDataLatch.countDown();
|
serverDataLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
public void onReset(Stream stream, ResetFrame frame)
|
public void onReset(Stream stream, ResetFrame frame)
|
||||||
{
|
{
|
||||||
// Simulate that there is pending data to send.
|
// Simulate that there is pending data to send.
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback.Adapter()
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
@ -192,14 +192,14 @@ public class StreamResetTest extends AbstractTest
|
||||||
Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
|
Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
ResetFrame resetFrame = new ResetFrame(stream1.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
|
ResetFrame resetFrame = new ResetFrame(stream1.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
|
||||||
stream1.reset(resetFrame, Callback.Adapter.INSTANCE);
|
stream1.reset(resetFrame, Callback.NOOP);
|
||||||
|
|
||||||
Assert.assertTrue(serverResetLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(serverResetLatch.await(5, TimeUnit.SECONDS));
|
||||||
// Stream MUST NOT receive data sent by server after reset.
|
// Stream MUST NOT receive data sent by server after reset.
|
||||||
Assert.assertFalse(stream1DataLatch.await(1, TimeUnit.SECONDS));
|
Assert.assertFalse(stream1DataLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// The other stream should still be working.
|
// The other stream should still be working.
|
||||||
stream2.data(new DataFrame(stream2.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
|
stream2.data(new DataFrame(stream2.getId(), ByteBuffer.allocate(16), true), Callback.NOOP);
|
||||||
Assert.assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
|
||||||
Assert.assertTrue(stream2DataLatch.await(5, TimeUnit.SECONDS));
|
Assert.assertTrue(stream2DataLatch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
@ -262,7 +262,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
{
|
{
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
resetLatch.countDown();
|
resetLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -314,7 +314,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
HttpOutput output = (HttpOutput)response.getOutputStream();
|
HttpOutput output = (HttpOutput)response.getOutputStream();
|
||||||
output.sendContent(data, new Callback.Adapter()
|
output.sendContent(data, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
@ -341,7 +341,7 @@ public class StreamResetTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
{
|
{
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
resetLatch.countDown();
|
resetLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -138,7 +138,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
if (windowFrame != null)
|
if (windowFrame != null)
|
||||||
session.frames(stream, Callback.Adapter.INSTANCE, windowFrame, windowFrames);
|
session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -161,7 +161,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
{
|
{
|
||||||
if (getRecvWindow() < 0)
|
if (getRecvWindow() < 0)
|
||||||
{
|
{
|
||||||
close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.Adapter.INSTANCE);
|
close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.NOOP);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -209,7 +209,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
|
|
||||||
IStream stream = getStream(frame.getStreamId());
|
IStream stream = getStream(frame.getStreamId());
|
||||||
if (stream != null)
|
if (stream != null)
|
||||||
stream.process(frame, Callback.Adapter.INSTANCE);
|
stream.process(frame, Callback.NOOP);
|
||||||
else
|
else
|
||||||
notifyReset(this, frame);
|
notifyReset(this, frame);
|
||||||
}
|
}
|
||||||
|
@ -296,7 +296,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
if (reply)
|
if (reply)
|
||||||
{
|
{
|
||||||
SettingsFrame replyFrame = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
|
SettingsFrame replyFrame = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
|
||||||
settings(replyFrame, Callback.Adapter.INSTANCE);
|
settings(replyFrame, Callback.NOOP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +312,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
PingFrame reply = new PingFrame(frame.getPayload(), true);
|
PingFrame reply = new PingFrame(frame.getPayload(), true);
|
||||||
control(null, Callback.Adapter.INSTANCE, reply);
|
control(null, Callback.NOOP, reply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,7 +399,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
@Override
|
@Override
|
||||||
public void onConnectionFailure(int error, String reason)
|
public void onConnectionFailure(int error, String reason)
|
||||||
{
|
{
|
||||||
close(error, reason, Callback.Adapter.INSTANCE);
|
close(error, reason, Callback.NOOP);
|
||||||
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
|
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -619,7 +619,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
int maxCount = getMaxRemoteStreams();
|
int maxCount = getMaxRemoteStreams();
|
||||||
if (maxCount >= 0 && remoteCount >= maxCount)
|
if (maxCount >= 0 && remoteCount >= maxCount)
|
||||||
{
|
{
|
||||||
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
|
if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
|
||||||
|
@ -640,7 +640,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.Adapter.INSTANCE);
|
close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -783,7 +783,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
{
|
{
|
||||||
// We have closed locally, and only shutdown
|
// We have closed locally, and only shutdown
|
||||||
// the output; now queue a disconnect.
|
// the output; now queue a disconnect.
|
||||||
control(null, Callback.Adapter.INSTANCE, new DisconnectFrame());
|
control(null, Callback.NOOP, new DisconnectFrame());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case REMOTELY_CLOSED:
|
case REMOTELY_CLOSED:
|
||||||
|
@ -827,7 +827,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
|
||||||
case NOT_CLOSED:
|
case NOT_CLOSED:
|
||||||
{
|
{
|
||||||
// Real idle timeout, just close.
|
// Real idle timeout, just close.
|
||||||
close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.Adapter.INSTANCE);
|
close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case LOCALLY_CLOSED:
|
case LOCALLY_CLOSED:
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
|
||||||
close();
|
close();
|
||||||
|
|
||||||
// Tell the other peer that we timed out.
|
// Tell the other peer that we timed out.
|
||||||
reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
|
|
||||||
// Notify the application.
|
// Notify the application.
|
||||||
notifyTimeout(this, timeout);
|
notifyTimeout(this, timeout);
|
||||||
|
@ -238,7 +238,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
|
||||||
{
|
{
|
||||||
// It's a bad client, it does not deserve to be
|
// It's a bad client, it does not deserve to be
|
||||||
// treated gently by just resetting the stream.
|
// treated gently by just resetting the stream.
|
||||||
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.Adapter.INSTANCE);
|
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
|
||||||
callback.failed(new IOException("stream_window_exceeded"));
|
callback.failed(new IOException("stream_window_exceeded"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -246,7 +246,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
|
||||||
// SPEC: remotely closed streams must be replied with a reset.
|
// SPEC: remotely closed streams must be replied with a reset.
|
||||||
if (isRemotelyClosed())
|
if (isRemotelyClosed())
|
||||||
{
|
{
|
||||||
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE);
|
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
|
||||||
callback.failed(new EOFException("stream_closed"));
|
callback.failed(new EOFException("stream_closed"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,6 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
session.frames(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame);
|
session.frames(stream, Callback.NOOP, sessionFrame, streamFrame);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
|
||||||
// First close then abort, to be sure that the connection cannot be reused
|
// First close then abort, to be sure that the connection cannot be reused
|
||||||
// from an onFailure() handler or by blocking code waiting for completion.
|
// from an onFailure() handler or by blocking code waiting for completion.
|
||||||
getHttpDestination().close(this);
|
getHttpDestination().close(this);
|
||||||
session.close(ErrorCode.NO_ERROR.code, null, Callback.Adapter.INSTANCE);
|
session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP);
|
||||||
abort(new AsynchronousCloseException());
|
abort(new AsynchronousCloseException());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
|
||||||
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
|
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
|
||||||
{
|
{
|
||||||
// Not supported.
|
// Not supported.
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,12 @@ public class ByteBufferCallback implements Callback
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return callback.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
public ByteBuffer getByteBuffer()
|
public ByteBuffer getByteBuffer()
|
||||||
{
|
{
|
||||||
return buffer;
|
return buffer;
|
||||||
|
|
|
@ -143,7 +143,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
||||||
private void close(Stream stream, String reason)
|
private void close(Stream stream, String reason)
|
||||||
{
|
{
|
||||||
final Session session = stream.getSession();
|
final Session session = stream.getSession();
|
||||||
session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.Adapter.INSTANCE);
|
session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.NOOP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
||||||
settings = Collections.emptyMap();
|
settings = Collections.emptyMap();
|
||||||
SettingsFrame frame = new SettingsFrame(settings, false);
|
SettingsFrame frame = new SettingsFrame(settings, false);
|
||||||
// TODO: consider sending a WINDOW_UPDATE to enlarge the session send window of the client.
|
// TODO: consider sending a WINDOW_UPDATE to enlarge the session send window of the client.
|
||||||
frames(null, Callback.Adapter.INSTANCE, frame, Frame.EMPTY_ARRAY);
|
frames(null, Callback.NOOP, frame, Frame.EMPTY_ARRAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -74,7 +74,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
||||||
IStream stream = createRemoteStream(frame.getStreamId());
|
IStream stream = createRemoteStream(frame.getStreamId());
|
||||||
if (stream != null)
|
if (stream != null)
|
||||||
{
|
{
|
||||||
stream.process(frame, Callback.Adapter.INSTANCE);
|
stream.process(frame, Callback.NOOP);
|
||||||
Stream.Listener listener = notifyNewStream(stream, frame);
|
Stream.Listener listener = notifyNewStream(stream, frame);
|
||||||
stream.setListener(listener);
|
stream.setListener(listener);
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,6 +164,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
||||||
|
|
||||||
boolean handle = onContent(new HttpInput.Content(copy)
|
boolean handle = onContent(new HttpInput.Content(copy)
|
||||||
{
|
{
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return callback.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -194,7 +194,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
{
|
{
|
||||||
// If the stream is not closed, it is still reading the request content.
|
// If the stream is not closed, it is still reading the request content.
|
||||||
// Send a reset to the other end so that it stops sending data.
|
// Send a reset to the other end so that it stops sending data.
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
// Now that this stream is reset, in-flight data frames will be consumed and discarded.
|
// Now that this stream is reset, in-flight data frames will be consumed and discarded.
|
||||||
// Consume the existing queued data frames to avoid stalling the flow control.
|
// Consume the existing queued data frames to avoid stalling the flow control.
|
||||||
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||||
|
@ -209,11 +209,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("HTTP2 Response #{} aborted", stream == null ? -1 : stream.getId());
|
LOG.debug("HTTP2 Response #{} aborted", stream == null ? -1 : stream.getId());
|
||||||
if (stream != null)
|
if (stream != null)
|
||||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.Adapter.INSTANCE);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class CommitCallback implements Callback
|
private class CommitCallback implements Callback.NonBlocking
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class CloseTest extends AbstractServerTest
|
||||||
sessionRef.set(stream.getSession());
|
sessionRef.set(stream.getSession());
|
||||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
// Reply with HEADERS.
|
// Reply with HEADERS.
|
||||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
closeLatch.await(5, TimeUnit.SECONDS);
|
closeLatch.await(5, TimeUnit.SECONDS);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -127,7 +127,7 @@ public class CloseTest extends AbstractServerTest
|
||||||
{
|
{
|
||||||
sessionRef.set(stream.getSession());
|
sessionRef.set(stream.getSession());
|
||||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -190,8 +190,8 @@ public class CloseTest extends AbstractServerTest
|
||||||
stream.setIdleTimeout(10 * idleTimeout);
|
stream.setIdleTimeout(10 * idleTimeout);
|
||||||
sessionRef.set(stream.getSession());
|
sessionRef.set(stream.getSession());
|
||||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
stream.getSession().close(ErrorCode.NO_ERROR.code, "OK", Callback.Adapter.INSTANCE);
|
stream.getSession().close(ErrorCode.NO_ERROR.code, "OK", Callback.NOOP);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -194,8 +194,16 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
Class<?> c=getClass();
|
||||||
|
String name=c.getSimpleName();
|
||||||
|
while (name.length()==0 && c.getSuperclass()!=null)
|
||||||
|
{
|
||||||
|
c=c.getSuperclass();
|
||||||
|
name=c.getSimpleName();
|
||||||
|
}
|
||||||
|
|
||||||
return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}",
|
return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}",
|
||||||
getClass().getSimpleName(),
|
name,
|
||||||
hashCode(),
|
hashCode(),
|
||||||
getRemoteAddress(),
|
getRemoteAddress(),
|
||||||
getLocalAddress().getPort(),
|
getLocalAddress().getPort(),
|
||||||
|
|
|
@ -108,7 +108,7 @@ public abstract class FillInterest
|
||||||
public boolean isCallbackNonBlocking()
|
public boolean isCallbackNonBlocking()
|
||||||
{
|
{
|
||||||
Callback callback = _interested.get();
|
Callback callback = _interested.get();
|
||||||
return callback instanceof Callback.NonBlocking;
|
return callback.isNonBlocking();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -53,10 +53,63 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
|
||||||
*/
|
*/
|
||||||
private int _desiredInterestOps;
|
private int _desiredInterestOps;
|
||||||
|
|
||||||
private final Runnable _runUpdateKey = new Runnable() { public void run() { updateKey(); } };
|
private final Runnable _runUpdateKey = new Runnable()
|
||||||
private final Runnable _runFillable = new Runnable() { public void run() { getFillInterest().fillable(); } };
|
{
|
||||||
private final Runnable _runCompleteWrite = new Runnable() { public void run() { getWriteFlusher().completeWrite(); } };
|
@Override
|
||||||
private final Runnable _runFillableCompleteWrite = new Runnable() { public void run() { getFillInterest().fillable(); getWriteFlusher().completeWrite(); } };
|
public void run()
|
||||||
|
{
|
||||||
|
updateKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return SelectChannelEndPoint.this.toString()+":runUpdateKey";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private final Runnable _runFillable = new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
getFillInterest().fillable();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return SelectChannelEndPoint.this.toString()+":runFillable";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private final Runnable _runCompleteWrite = new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
getWriteFlusher().completeWrite();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private final Runnable _runFillableCompleteWrite = new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
getFillInterest().fillable();
|
||||||
|
getWriteFlusher().completeWrite();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return SelectChannelEndPoint.this.toString()+":runFillableCompleteWrite";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
|
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
|
||||||
{
|
{
|
||||||
|
@ -97,12 +150,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
|
||||||
_desiredInterestOps = newInterestOps;
|
_desiredInterestOps = newInterestOps;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
|
|
||||||
|
|
||||||
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
|
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
|
||||||
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
|
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
|
||||||
|
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
|
||||||
|
|
||||||
// Run non-blocking code immediately.
|
// Run non-blocking code immediately.
|
||||||
// This producer knows that this non-blocking code is special
|
// This producer knows that this non-blocking code is special
|
||||||
// and that it must be run in this thread and not fed to the
|
// and that it must be run in this thread and not fed to the
|
||||||
|
@ -110,18 +165,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
|
||||||
// tasks (or it may starve forever just after having run them).
|
// tasks (or it may starve forever just after having run them).
|
||||||
if (readable && getFillInterest().isCallbackNonBlocking())
|
if (readable && getFillInterest().isCallbackNonBlocking())
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Direct readable run {}",this);
|
||||||
_runFillable.run();
|
_runFillable.run();
|
||||||
readable = false;
|
readable = false;
|
||||||
}
|
}
|
||||||
if (writable && getWriteFlusher().isCallbackNonBlocking())
|
if (writable && getWriteFlusher().isCallbackNonBlocking())
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Direct writable run {}",this);
|
||||||
_runCompleteWrite.run();
|
_runCompleteWrite.run();
|
||||||
writable = false;
|
writable = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// return task to complete the job
|
// return task to complete the job
|
||||||
return readable ? (writable ? _runFillableCompleteWrite : _runFillable)
|
Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable)
|
||||||
: (writable ? _runCompleteWrite : null);
|
: (writable ? _runCompleteWrite : null);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("task {}",task);
|
||||||
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -271,7 +271,7 @@ abstract public class WriteFlusher
|
||||||
|
|
||||||
boolean isCallbackNonBlocking()
|
boolean isCallbackNonBlocking()
|
||||||
{
|
{
|
||||||
return _callback instanceof Callback.NonBlocking;
|
return _callback.isNonBlocking();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,7 @@ public class SelectorManagerTest
|
||||||
long timeout = connectTimeout * 2;
|
long timeout = connectTimeout * 2;
|
||||||
timeoutConnection.set(timeout);
|
timeoutConnection.set(timeout);
|
||||||
final CountDownLatch latch1 = new CountDownLatch(1);
|
final CountDownLatch latch1 = new CountDownLatch(1);
|
||||||
selectorManager.connect(client1, new Callback.Adapter()
|
selectorManager.connect(client1, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
@ -141,7 +141,7 @@ public class SelectorManagerTest
|
||||||
client2.connect(address);
|
client2.connect(address);
|
||||||
timeoutConnection.set(0);
|
timeoutConnection.set(0);
|
||||||
final CountDownLatch latch2 = new CountDownLatch(1);
|
final CountDownLatch latch2 = new CountDownLatch(1);
|
||||||
selectorManager.connect(client2, new Callback.Adapter()
|
selectorManager.connect(client2, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
|
|
@ -258,7 +258,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
||||||
{
|
{
|
||||||
if (!provider.isClosed())
|
if (!provider.isClosed())
|
||||||
{
|
{
|
||||||
process(BufferUtil.EMPTY_BUFFER, new Adapter()
|
process(BufferUtil.EMPTY_BUFFER, new Callback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
|
|
|
@ -712,7 +712,13 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
||||||
{
|
{
|
||||||
_callback = callback;
|
_callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return _callback.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -539,7 +539,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class BlockingReadCallback implements Callback.NonBlocking
|
private class BlockingReadCallback implements Callback
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
|
@ -552,6 +552,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
{
|
{
|
||||||
_input.failed(x);
|
_input.failed(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
// This callback does not block, rather it wakes up the
|
||||||
|
// thread that is blocked waiting on the read.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class AsyncReadCallback implements Callback
|
private class AsyncReadCallback implements Callback
|
||||||
|
@ -588,6 +596,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
super(true);
|
super(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return _callback.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
|
private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
|
||||||
{
|
{
|
||||||
if (reset())
|
if (reset())
|
||||||
|
@ -743,7 +757,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
if (_shutdownOut)
|
if (_shutdownOut)
|
||||||
getEndPoint().shutdownOutput();
|
getEndPoint().shutdownOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
|
|
@ -653,7 +653,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Content extends Callback.Adapter
|
public static class Content implements Callback
|
||||||
{
|
{
|
||||||
private final ByteBuffer _content;
|
private final ByteBuffer _content;
|
||||||
|
|
||||||
|
@ -662,6 +662,13 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
_content=content;
|
_content=content;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public ByteBuffer getContent()
|
public ByteBuffer getContent()
|
||||||
{
|
{
|
||||||
return _content;
|
return _content;
|
||||||
|
|
|
@ -1215,7 +1215,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
_in=in;
|
_in=in;
|
||||||
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
|
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Action process() throws Exception
|
protected Action process() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.eclipse.jetty.util.log.StdErrLog;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
@ -368,6 +369,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
@Ignore
|
||||||
public void testNoBlockingTimeoutRead() throws Exception
|
public void testNoBlockingTimeoutRead() throws Exception
|
||||||
{
|
{
|
||||||
_httpConfiguration.setBlockingTimeout(-1L);
|
_httpConfiguration.setBlockingTimeout(-1L);
|
||||||
|
@ -494,6 +496,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
@Ignore
|
||||||
public void testNoBlockingTimeoutWrite() throws Exception
|
public void testNoBlockingTimeoutWrite() throws Exception
|
||||||
{
|
{
|
||||||
configureServer(new HugeResponseHandler());
|
configureServer(new HugeResponseHandler());
|
||||||
|
|
|
@ -0,0 +1,216 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.servlets;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.ManagedSelector;
|
||||||
|
import org.eclipse.jetty.io.SelectChannelEndPoint;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
|
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||||
|
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class DefaultServletStarvationTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public TestTracker tracker = new TestTracker();
|
||||||
|
private Server _server;
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void dispose() throws Exception
|
||||||
|
{
|
||||||
|
if (_server != null)
|
||||||
|
_server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultServletStarvation() throws Exception
|
||||||
|
{
|
||||||
|
int maxThreads = 2;
|
||||||
|
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
|
||||||
|
threadPool.setDetailedDump(true);
|
||||||
|
_server = new Server(threadPool);
|
||||||
|
|
||||||
|
// Prepare a big file to download.
|
||||||
|
File directory = MavenTestingUtils.getTargetTestingDir();
|
||||||
|
Files.createDirectories(directory.toPath());
|
||||||
|
String resourceName = "resource.bin";
|
||||||
|
Path resourcePath = Paths.get(directory.getPath(), resourceName);
|
||||||
|
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
|
||||||
|
{
|
||||||
|
byte[] chunk = new byte[1024];
|
||||||
|
Arrays.fill(chunk,(byte)'X');
|
||||||
|
chunk[chunk.length-2]='\r';
|
||||||
|
chunk[chunk.length-1]='\n';
|
||||||
|
for (int i = 0; i < 256 * 1024; ++i)
|
||||||
|
output.write(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
final CountDownLatch writePending = new CountDownLatch(1);
|
||||||
|
ServerConnector connector = new ServerConnector(_server, 0, 1)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
|
||||||
|
{
|
||||||
|
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onIncompleteFlush()
|
||||||
|
{
|
||||||
|
super.onIncompleteFlush();
|
||||||
|
writePending.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
_server.addConnector(connector);
|
||||||
|
|
||||||
|
ServletContextHandler context = new ServletContextHandler(_server, "/");
|
||||||
|
context.setResourceBase(directory.toURI().toString());
|
||||||
|
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
|
||||||
|
_server.setHandler(context);
|
||||||
|
|
||||||
|
_server.start();
|
||||||
|
|
||||||
|
List<Socket> sockets = new ArrayList<>();
|
||||||
|
for (int i = 0; i < maxThreads; ++i)
|
||||||
|
{
|
||||||
|
Socket socket = new Socket("localhost", connector.getLocalPort());
|
||||||
|
sockets.add(socket);
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
String request = "" +
|
||||||
|
"GET /" + resourceName + " HTTP/1.1\r\n" +
|
||||||
|
"Host: localhost\r\n" +
|
||||||
|
// "Connection: close\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
output.write(request.getBytes(StandardCharsets.UTF_8));
|
||||||
|
output.flush();
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Wait for a the servlet to block.
|
||||||
|
Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
_server.dumpStdErr();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
|
||||||
|
ScheduledFuture<?> dumper = Executors.newSingleThreadScheduledExecutor().schedule(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
_server.dumpStdErr();
|
||||||
|
}
|
||||||
|
}, 10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
|
long expected = Files.size(resourcePath);
|
||||||
|
byte[] buffer = new byte[48 * 1024];
|
||||||
|
for (Socket socket : sockets)
|
||||||
|
{
|
||||||
|
String socketString = socket.toString();
|
||||||
|
System.out.println("Reading socket " + socketString+"...");
|
||||||
|
long total = 0;
|
||||||
|
InputStream input = socket.getInputStream();
|
||||||
|
|
||||||
|
// look for CRLFCRLF
|
||||||
|
StringBuilder header = new StringBuilder();
|
||||||
|
int state=0;
|
||||||
|
while (state<4 && header.length()<2048)
|
||||||
|
{
|
||||||
|
int ch=input.read();
|
||||||
|
if (ch<0)
|
||||||
|
break;
|
||||||
|
header.append((char)ch);
|
||||||
|
switch(state)
|
||||||
|
{
|
||||||
|
case 0:
|
||||||
|
if (ch=='\r')
|
||||||
|
state=1;
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
if (ch=='\n')
|
||||||
|
state=2;
|
||||||
|
else
|
||||||
|
state=0;
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
if (ch=='\r')
|
||||||
|
state=3;
|
||||||
|
else
|
||||||
|
state=0;
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
if (ch=='\n')
|
||||||
|
state=4;
|
||||||
|
else
|
||||||
|
state=0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.out.println("Header socket " + socketString+"\n"+header.toString());
|
||||||
|
|
||||||
|
while (total<expected)
|
||||||
|
{
|
||||||
|
int read=input.read(buffer);
|
||||||
|
if (read<0)
|
||||||
|
break;
|
||||||
|
total+=read;
|
||||||
|
System.out.printf("READ %d of %d/%d from %s%n",read,total,expected,socketString);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(expected,total);
|
||||||
|
}
|
||||||
|
|
||||||
|
dumper.cancel(false);
|
||||||
|
|
||||||
|
// We could read everything, good.
|
||||||
|
for (Socket socket : sockets)
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,7 +30,8 @@ import org.eclipse.jetty.util.log.Logger;
|
||||||
/**
|
/**
|
||||||
* An implementation of Callback that blocks until success or failure.
|
* An implementation of Callback that blocks until success or failure.
|
||||||
*/
|
*/
|
||||||
public class BlockingCallback implements Callback
|
@Deprecated
|
||||||
|
public class BlockingCallback implements Callback.NonBlocking
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(BlockingCallback.class);
|
private static final Logger LOG = Log.getLogger(BlockingCallback.class);
|
||||||
private static Throwable SUCCEEDED = new Throwable()
|
private static Throwable SUCCEEDED = new Throwable()
|
||||||
|
|
|
@ -42,45 +42,54 @@ package org.eclipse.jetty.util;
|
||||||
*/
|
*/
|
||||||
public interface Callback
|
public interface Callback
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Instance of Adapter that can be used when the callback methods need an empty
|
||||||
|
* implementation without incurring in the cost of allocating a new Adapter object.
|
||||||
|
*/
|
||||||
|
static Callback NOOP = new Callback(){};
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Callback invoked when the operation completes.</p>
|
* <p>Callback invoked when the operation completes.</p>
|
||||||
*
|
*
|
||||||
* @see #failed(Throwable)
|
* @see #failed(Throwable)
|
||||||
*/
|
*/
|
||||||
public abstract void succeeded();
|
default void succeeded()
|
||||||
|
{}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Callback invoked when the operation fails.</p>
|
* <p>Callback invoked when the operation fails.</p>
|
||||||
* @param x the reason for the operation failure
|
* @param x the reason for the operation failure
|
||||||
*/
|
*/
|
||||||
public void failed(Throwable x);
|
default void failed(Throwable x)
|
||||||
|
{}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A marker interface for a callback that is guaranteed not to
|
* @return True if the callback is known to never block the caller
|
||||||
* block and thus does not need a dispatch
|
|
||||||
*/
|
*/
|
||||||
public interface NonBlocking extends Callback
|
default boolean isNonBlocking()
|
||||||
{}
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback interface that declares itself as non-blocking
|
||||||
|
*/
|
||||||
|
interface NonBlocking extends Callback
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public default boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Empty implementation of {@link Callback}</p>
|
* <p>Empty implementation of {@link Callback}</p>
|
||||||
*/
|
*/
|
||||||
public static class Adapter implements Callback
|
@Deprecated
|
||||||
{
|
static class Adapter implements Callback
|
||||||
/**
|
{}
|
||||||
* Instance of Adapter that can be used when the callback methods need an empty
|
|
||||||
* implementation without incurring in the cost of allocating a new Adapter object.
|
|
||||||
*/
|
|
||||||
public static final Adapter INSTANCE = new Adapter();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void succeeded()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void failed(Throwable x)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,12 @@ public abstract class IteratingNestedCallback extends IteratingCallback
|
||||||
{
|
{
|
||||||
_callback=callback;
|
_callback=callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNonBlocking()
|
||||||
|
{
|
||||||
|
return _callback.isNonBlocking();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void onCompleteSuccess()
|
protected void onCompleteSuccess()
|
||||||
|
|
|
@ -124,6 +124,9 @@ public class SharedBlockingCallback
|
||||||
/**
|
/**
|
||||||
* A Closeable Callback.
|
* A Closeable Callback.
|
||||||
* Uses the auto close mechanism to check block has been called OK.
|
* Uses the auto close mechanism to check block has been called OK.
|
||||||
|
* <p>Implements {@link Callback.NonBlocking} because calls to this
|
||||||
|
* callback do not blocak, rather they wakeup the thread that is blocked
|
||||||
|
* in {@link #block()}
|
||||||
*/
|
*/
|
||||||
public class Blocker implements Callback.NonBlocking, Closeable
|
public class Blocker implements Callback.NonBlocking, Closeable
|
||||||
{
|
{
|
||||||
|
@ -132,7 +135,7 @@ public class SharedBlockingCallback
|
||||||
protected Blocker()
|
protected Blocker()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class BlockingWriteCallback extends SharedBlockingCallback
|
||||||
return new WriteBlocker(acquire());
|
return new WriteBlocker(acquire());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class WriteBlocker implements WriteCallback, Callback, AutoCloseable
|
public static class WriteBlocker implements WriteCallback, Callback.NonBlocking, AutoCloseable
|
||||||
{
|
{
|
||||||
private final Blocker blocker;
|
private final Blocker blocker;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue