Fixes #648 - Problem using InputStreamResponseListener to handle HTTP/2 responses.

The fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=484446
reimplemented InputStreamResponseListener using callbacks rather than
blocking waits.

However, HTTP/2 behaves a little differently than HTTP/1.
Where in HTTP/1 until the callback was completed no further calls to
onContent() were made, with HTTP/2 additional calls are made until
the flow control window is exhausted.

For this reason InputStreamResponseListener must queue content chunks
rather than dealing only with one chunk at a time.
This commit is contained in:
Simone Bordet 2016-06-20 23:10:12 +02:00
parent 442a7ce8cc
commit 7ef22c8ebd
3 changed files with 137 additions and 55 deletions

View File

@ -23,6 +23,11 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -76,11 +81,11 @@ public class InputStreamResponseListener extends Listener.Adapter
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>();
private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
private Response response;
private Result result;
private Throwable failure;
private boolean closed;
private DeferredContentProvider.Chunk chunk;
public InputStreamResponseListener()
{
@ -121,7 +126,9 @@ public class InputStreamResponseListener extends Listener.Adapter
closed = this.closed;
if (!closed)
{
chunk = new DeferredContentProvider.Chunk(content, callback);
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", content);
chunks.add(new DeferredContentProvider.Chunk(content, callback));
lock.notifyAll();
}
}
@ -139,7 +146,8 @@ public class InputStreamResponseListener extends Listener.Adapter
{
synchronized (lock)
{
chunk = EOF;
if (!closed)
chunks.add(EOF);
lock.notifyAll();
}
@ -150,37 +158,34 @@ public class InputStreamResponseListener extends Listener.Adapter
@Override
public void onFailure(Response response, Throwable failure)
{
Callback callback = null;
List<Callback> callbacks;
synchronized (lock)
{
if (this.failure != null)
return;
this.failure = failure;
if (chunk != null)
callback = chunk.callback;
callbacks = drain();
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("Content failure", failure);
if (callback != null)
callback.failed(failure);
callbacks.forEach(callback -> callback.failed(failure));
}
@Override
public void onComplete(Result result)
{
Throwable failure = result.getFailure();
Callback callback = null;
List<Callback> callbacks = Collections.emptyList();
synchronized (lock)
{
this.result = result;
if (result.isFailed() && this.failure == null)
{
this.failure = failure;
if (chunk != null)
callback = chunk.callback;
callbacks = drain();
}
// Notify the response latch in case of request failures.
responseLatch.countDown();
@ -196,8 +201,7 @@ public class InputStreamResponseListener extends Listener.Adapter
LOG.debug("Result failure", failure);
}
if (callback != null)
callback.failed(failure);
callbacks.forEach(callback -> callback.failed(failure));
}
/**
@ -265,6 +269,23 @@ public class InputStreamResponseListener extends Listener.Adapter
return IO.getClosedStream();
}
private List<Callback> drain()
{
List<Callback> callbacks = new ArrayList<>();
synchronized (lock)
{
while (true)
{
DeferredContentProvider.Chunk chunk = chunks.peek();
if (chunk == null || chunk == EOF)
break;
callbacks.add(chunk.callback);
chunks.poll();
}
}
return callbacks;
}
private class Input extends InputStream
{
@Override
@ -286,16 +307,22 @@ public class InputStreamResponseListener extends Listener.Adapter
Callback callback = null;
synchronized (lock)
{
DeferredContentProvider.Chunk chunk;
while (true)
{
if (failure != null)
throw toIOException(failure);
chunk = chunks.peek();
if (chunk == EOF)
return -1;
if (closed)
throw new AsynchronousCloseException();
if (chunk != null)
break;
if (failure != null)
throw toIOException(failure);
if (closed)
throw new AsynchronousCloseException();
lock.wait();
}
@ -305,7 +332,7 @@ public class InputStreamResponseListener extends Listener.Adapter
if (!buffer.hasRemaining())
{
callback = chunk.callback;
chunk = null;
chunks.poll();
}
}
if (callback != null)
@ -329,22 +356,21 @@ public class InputStreamResponseListener extends Listener.Adapter
@Override
public void close() throws IOException
{
Callback callback = null;
List<Callback> callbacks;
synchronized (lock)
{
if (closed)
return;
closed = true;
if (chunk != null)
callback = chunk.callback;
callbacks = drain();
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("InputStream close");
if (callback != null)
callback.failed(new AsynchronousCloseException());
Throwable failure = new AsynchronousCloseException();
callbacks.forEach(callback -> callback.failed(failure));
super.close();
}

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class EmptyServerHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.client;
package org.eclipse.jetty.http.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -65,15 +65,15 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientStreamTest extends AbstractHttpClientServerTest
public class HttpClientStreamTest extends AbstractTest
{
public HttpClientStreamTest(SslContextFactory sslContextFactory)
public HttpClientStreamTest(Transport transport)
{
super(sslContextFactory);
super(transport);
}
@Test
@ -90,11 +90,30 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
output.write(kb);
}
start(new RespondThenConsumeHandler());
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentLength(0);
response.flushBuffer();
InputStream in = request.getInputStream();
byte[] buffer = new byte[1024];
while (true)
{
int read = in.read(buffer);
if (read < 0)
break;
}
}
});
final AtomicLong requestTime = new AtomicLong();
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.file(upload)
.onRequestSuccess(request -> requestTime.set(System.nanoTime()))
.timeout(30, TimeUnit.SECONDS)
@ -127,7 +146,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -168,7 +187,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -215,7 +234,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -241,7 +260,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
// Expected.
}
Assert.assertEquals(data.length, length);
Assert.assertThat(length, Matchers.lessThanOrEqualTo(data.length));
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
@ -267,7 +286,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
stream.close();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(new BytesContentProvider(new byte[]{0, 1, 2, 3}))
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
@ -309,7 +328,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
@ -366,7 +385,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
@ -418,7 +437,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
@ -443,7 +462,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
// Connect to the wrong port
client.newRequest("localhost", port)
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
@ -464,7 +483,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final byte[] data = new byte[]{0, 1, 2, 3};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(new InputStreamContentProvider(new InputStream()
{
private int index = 0;
@ -510,7 +529,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -555,7 +574,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -591,7 +610,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -630,7 +649,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
try (DeferredContentProvider content = new DeferredContentProvider())
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(result ->
{
@ -680,7 +699,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
});
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(result ->
{
@ -720,7 +739,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener()
{
@ -784,7 +803,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
{
byte[] chunk = iteratorData[index.getAndIncrement()];
ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk);
if (index.get() == 2)
if (index.get() < iteratorData.length)
{
contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result);
contentRef.get().close();
@ -802,7 +821,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
contentRef.set(content);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener()
{
@ -836,7 +855,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener()
{
@ -879,7 +898,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener(data.length)
{
@ -920,7 +939,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(result ->
{
@ -959,7 +978,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final CountDownLatch completeLatch = new CountDownLatch(1);
final DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.onRequestBegin(request ->
{
@ -1007,7 +1026,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(content)
.send(result ->
{
@ -1079,7 +1098,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.content(provider)
.onRequestCommit(request -> commit.set(true))
.send(result ->
@ -1110,7 +1129,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.scheme(getScheme())
.timeout(5, TimeUnit.SECONDS)
.send(listener);