Issue #1375 - Support pushed resources in HTTP client.

Implemented in the non-API HttpRequest class.
This commit is contained in:
Simone Bordet 2017-03-05 19:01:08 +01:00
parent 18d036c244
commit affeb67f1a
7 changed files with 289 additions and 10 deletions

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
@ -81,6 +82,7 @@ public class HttpRequest implements Request
private List<HttpCookie> cookies;
private Map<String, Object> attributes;
private List<RequestListener> requestListeners;
private BiFunction<Request, Request, Response.CompleteListener> pushListener;
protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
{
@ -567,6 +569,26 @@ public class HttpRequest implements Request
return this;
}
/**
* <p>Sets a listener for pushed resources.</p>
* <p>When resources are pushed from the server, the given {@code listener}
* is invoked for every pushed resource.
* The parameters to the {@code BiFunction} are this request and the
* synthesized request for the pushed resource.
* The {@code BiFunction} should return a {@code CompleteListener} that
* may also implement other listener interfaces to be notified of various
* response events, or {@code null} to signal that the pushed resource
* should be canceled.</p>
*
* @param listener a listener for pushed resource events
* @return this request object
*/
public Request pushListener(BiFunction<Request, Request, Response.CompleteListener> listener)
{
this.pushListener = listener;
return this;
}
@Override
public ContentProvider getContent()
{
@ -698,6 +720,11 @@ public class HttpRequest implements Request
return responseListeners;
}
public BiFunction<Request, Request, Response.CompleteListener> getPushListener()
{
return pushListener;
}
@Override
public boolean abort(Throwable cause)
{

View File

@ -34,19 +34,26 @@ public class HttpChannelOverHTTP2 extends HttpChannel
{
private final HttpConnectionOverHTTP2 connection;
private final Session session;
private final boolean push;
private final HttpSenderOverHTTP2 sender;
private final HttpReceiverOverHTTP2 receiver;
private Stream stream;
public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session)
public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session, boolean push)
{
super(destination);
this.connection = connection;
this.session = session;
this.push = push;
this.sender = new HttpSenderOverHTTP2(this);
this.receiver = new HttpReceiverOverHTTP2(this);
}
protected HttpConnectionOverHTTP2 getHttpConnection()
{
return connection;
}
public Session getSession()
{
return session;
@ -110,6 +117,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
release();
if (!push)
release();
}
}

View File

@ -58,15 +58,15 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
normalizeRequest(exchange.getRequest());
// One connection maps to N channels, so for each exchange we create a new channel.
HttpChannel channel = newHttpChannel();
HttpChannel channel = newHttpChannel(false);
channels.add(channel);
return send(channel, exchange);
}
protected HttpChannelOverHTTP2 newHttpChannel()
protected HttpChannelOverHTTP2 newHttpChannel(boolean push)
{
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession());
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession(), push);
}
protected void release(HttpChannel channel)

View File

@ -21,13 +21,19 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
@ -91,7 +97,32 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
// Not supported.
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return null;
HttpRequest request = exchange.getRequest();
MetaData.Request metaData = (MetaData.Request)frame.getMetaData();
HttpRequest pushRequest = (HttpRequest)getHttpDestination().getHttpClient().newRequest(metaData.getURIString());
BiFunction<Request, Request, Response.CompleteListener> pushListener = request.getPushListener();
if (pushListener != null)
{
Response.CompleteListener listener = pushListener.apply(request, pushRequest);
if (listener != null)
{
HttpChannelOverHTTP2 pushChannel = getHttpChannel().getHttpConnection().newHttpChannel(true);
List<Response.ResponseListener> listeners = Collections.singletonList(listener);
HttpExchange pushExchange = new HttpExchange(getHttpDestination(), pushRequest, listeners);
pushChannel.associate(pushExchange);
pushChannel.setStream(stream);
// TODO: idle timeout ?
pushExchange.requestComplete(null);
pushExchange.terminateRequest();
return pushChannel.getStreamListener();
}
}
stream.reset(new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
return null;
}

View File

@ -247,9 +247,9 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
return new HttpConnectionOverHTTP2(destination, session)
{
@Override
protected HttpChannelOverHTTP2 newHttpChannel()
protected HttpChannelOverHTTP2 newHttpChannel(boolean push)
{
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession())
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession(), push)
{
@Override
public void setStream(Stream stream)

View File

@ -0,0 +1,213 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class PushedResourcesTest extends AbstractTest
{
@Test
public void testPushedResourceCancelled() throws Exception
{
String pushPath = "/secondary";
CountDownLatch latch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
HttpURI pushURI = new HttpURI("http://localhost:" + connector.getLocalPort() + pushPath);
MetaData.Request pushRequest = new MetaData.Request(HttpMethod.GET.asString(), pushURI, HttpVersion.HTTP_2, new HttpFields());
stream.push(new PushPromiseFrame(stream.getId(), 0, pushRequest), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
// Just send the normal response and wait for the reset.
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
latch.countDown();
}
});
return null;
}
});
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
ContentResponse response = request
.pushListener((mainRequest, pushedRequest) -> null)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPushedResources() throws Exception
{
Random random = new Random();
byte[] bytes = new byte[512];
random.nextBytes(bytes);
byte[] pushBytes1 = new byte[1024];
random.nextBytes(pushBytes1);
byte[] pushBytes2 = new byte[2048];
random.nextBytes(pushBytes2);
String path1 = "/secondary1";
String path2 = "/secondary2";
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (target.equals(path1))
{
response.getOutputStream().write(pushBytes1);
}
else if (target.equals(path2))
{
response.getOutputStream().write(pushBytes2);
}
else
{
baseRequest.getPushBuilder()
.path(path1)
.push();
baseRequest.getPushBuilder()
.path(path2)
.push();
response.getOutputStream().write(bytes);
}
}
});
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
ContentResponse response = request
.pushListener((mainRequest, pushedRequest) -> new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isSucceeded());
if (pushedRequest.getPath().equals(path1))
{
Assert.assertArrayEquals(pushBytes1, getContent());
latch1.countDown();
}
else if (pushedRequest.getPath().equals(path2))
{
Assert.assertArrayEquals(pushBytes2, getContent());
latch2.countDown();
}
}
})
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertArrayEquals(bytes, response.getContent());
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
}
@Test
public void testPushedResourceRedirect() throws Exception
{
Random random = new Random();
byte[] pushBytes = new byte[512];
random.nextBytes(pushBytes);
String oldPath = "/old";
String newPath = "/new";
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (target.equals(oldPath))
response.sendRedirect(newPath);
else if (target.equals(newPath))
response.getOutputStream().write(pushBytes);
else
baseRequest.getPushBuilder().path(oldPath).push();
}
});
CountDownLatch latch = new CountDownLatch(1);
HttpRequest request = (HttpRequest)client.newRequest("localhost", connector.getLocalPort());
ContentResponse response = request
.pushListener((mainRequest, pushedRequest) -> new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isSucceeded());
Assert.assertEquals(oldPath, pushedRequest.getPath());
Assert.assertEquals(newPath, result.getRequest().getPath());
Assert.assertArrayEquals(pushBytes, getContent());
latch.countDown();
}
})
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -146,9 +146,9 @@ public class HttpChannelAssociationTest extends AbstractTest
return new HttpConnectionOverHTTP2(destination, session)
{
@Override
protected HttpChannelOverHTTP2 newHttpChannel()
protected HttpChannelOverHTTP2 newHttpChannel(boolean push)
{
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession())
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession(), push)
{
@Override
public boolean associate(HttpExchange exchange)