Fixes #1148 - Support HTTP/2 HEADERS trailer.

This commit is contained in:
Simone Bordet 2016-12-08 10:08:56 +01:00
parent 44c84ffb09
commit a721e8b25d
5 changed files with 901 additions and 21 deletions

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
@ -78,30 +80,23 @@ public class HTTP2ClientSession extends HTTP2Session
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream == null)
if (stream != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_response");
}
else
{
stream.process(frame, Callback.NOOP);
notifyHeaders(stream, frame);
}
}
else
{
stream.process(frame, Callback.NOOP);
notifyHeaders(stream, frame);
}
}
private void notifyHeaders(IStream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
if (listener == null)
return;
try
{
listener.onHeaders(stream, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
}
}

View File

@ -0,0 +1,709 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class RawHTTP2ProxyTest
{
private static final Logger LOGGER = Log.getLogger(RawHTTP2ProxyTest.class);
private final List<Server> servers = new ArrayList<>();
private final List<HTTP2Client> clients = new ArrayList<>();
private Server startServer(String name, ServerSessionListener listener) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName(name);
Server server = new Server(serverExecutor);
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
ServerConnector connector = new ServerConnector(server, 1, 1, connectionFactory);
server.addConnector(connector);
server.setAttribute("connector", connector);
servers.add(server);
server.start();
return server;
}
private HTTP2Client startClient(String name) throws Exception
{
HTTP2Client client = new HTTP2Client();
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName(name);
client.setExecutor(clientExecutor);
clients.add(client);
client.start();
return client;
}
@After
public void dispose() throws Exception
{
for (int i = clients.size() - 1; i >= 0; i--)
{
HTTP2Client client = clients.get(i);
client.stop();
}
for (int i = servers.size() - 1; i >= 0; i--)
{
Server server = servers.get(i);
server.stop();
}
}
@Test
public void testRawHTTP2Proxy() throws Exception
{
byte[] data1 = new byte[1024];
new Random().nextBytes(data1);
ByteBuffer buffer1 = ByteBuffer.wrap(data1);
Server server1 = startServer("server1", new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 received {}", frame);
return new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 received {}", frame);
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 sending {}", reply);
stream.headers(reply, new Callback()
{
@Override
public void succeeded()
{
DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), true);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 sending {}", data);
stream.data(data, NOOP);
}
});
}
}
};
}
});
ServerConnector connector1 = (ServerConnector)server1.getAttribute("connector");
Server server2 = startServer("server2", new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 received {}", frame);
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 received {}", frame);
callback.succeeded();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
Callback.Completable completable1 = new Callback.Completable();
HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 sending {}", reply);
stream.headers(reply, completable1);
completable1.thenCompose(ignored ->
{
Callback.Completable completable2 = new Callback.Completable();
DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), false);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 sending {}", data);
stream.data(data, completable2);
return completable2;
}).thenRun(() ->
{
MetaData trailer = new MetaData(HttpVersion.HTTP_2, new HttpFields());
HeadersFrame end = new HeadersFrame(stream.getId(), trailer, null, true);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 sending {}", end);
stream.headers(end, Callback.NOOP);
});
}
};
}
});
ServerConnector connector2 = (ServerConnector)server2.getAttribute("connector");
HTTP2Client proxyClient = startClient("proxyClient");
Server proxyServer = startServer("proxyServer", new ClientToProxySessionListener(proxyClient));
ServerConnector proxyConnector = (ServerConnector)proxyServer.getAttribute("connector");
InetSocketAddress proxyAddress = new InetSocketAddress("localhost", proxyConnector.getLocalPort());
HTTP2Client client = startClient("client");
FuturePromise<Session> clientPromise = new FuturePromise<>();
client.connect(proxyAddress, new Session.Listener.Adapter(), clientPromise);
Session clientSession = clientPromise.get(5, TimeUnit.SECONDS);
// Send a request with trailers for server1.
HttpFields fields1 = new HttpFields();
fields1.put("X-Target", String.valueOf(connector1.getLocalPort()));
MetaData.Request request1 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields1);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
CountDownLatch latch1 = new CountDownLatch(1);
clientSession.newStream(new HeadersFrame(request1, null, false), streamPromise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
Assert.assertEquals(buffer1.slice(), frame.getData());
callback.succeeded();
latch1.countDown();
}
});
Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS);
stream1.headers(new HeadersFrame(stream1.getId(), new MetaData(HttpVersion.HTTP_2, new HttpFields()), null, true), Callback.NOOP);
// Send a request for server2.
HttpFields fields2 = new HttpFields();
fields2.put("X-Target", String.valueOf(connector2.getLocalPort()));
MetaData.Request request2 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields2);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
CountDownLatch latch2 = new CountDownLatch(1);
clientSession.newStream(new HeadersFrame(request2, null, false), streamPromise2, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
if (frame.isEndStream())
latch2.countDown();
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
callback.succeeded();
}
});
Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS);
stream2.data(new DataFrame(stream2.getId(), buffer1.slice(), true), Callback.NOOP);
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
}
private static class ClientToProxySessionListener extends ServerSessionListener.Adapter
{
private final Map<Integer, ClientToProxyToServer> forwarders = new ConcurrentHashMap<>();
private final HTTP2Client client;
private ClientToProxySessionListener(HTTP2Client client)
{
this.client = client;
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Received {} for {} on {}: {}", frame, stream, stream.getSession(), frame.getMetaData());
// Forward to the right server.
MetaData metaData = frame.getMetaData();
HttpFields fields = metaData.getFields();
int port = Integer.parseInt(fields.get("X-Target"));
ClientToProxyToServer clientToProxyToServer = forwarders.computeIfAbsent(port, p -> new ClientToProxyToServer("localhost", p, client));
clientToProxyToServer.offer(stream, frame, Callback.NOOP);
return clientToProxyToServer;
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Received {} on {}", frame, session);
// TODO
}
@Override
public boolean onIdleTimeout(Session session)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Idle timeout on {}", session);
// TODO
return true;
}
@Override
public void onFailure(Session session, Throwable failure)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Failure on " + session, failure);
// TODO
}
}
private static class ClientToProxyToServer extends IteratingCallback implements Stream.Listener
{
private final Object lock = this;
private final Map<Stream, Deque<FrameInfo>> frames = new HashMap<>();
private final Map<Stream, Stream> streams = new HashMap<>();
private final ServerToProxyToClient serverToProxyToClient = new ServerToProxyToClient();
private final String host;
private final int port;
private final HTTP2Client client;
private Session proxyToServerSession;
private FrameInfo frameInfo;
private Stream clientToProxyStream;
private ClientToProxyToServer(String host, int port, HTTP2Client client)
{
this.host = host;
this.port = port;
this.client = client;
}
private void offer(Stream stream, Frame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS queueing {} for {} on {}", frame, stream, stream.getSession());
boolean connected;
synchronized (lock)
{
Deque<FrameInfo> deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>());
deque.offer(new FrameInfo(frame, callback));
connected = proxyToServerSession != null;
}
if (connected)
iterate();
else
connect();
}
private void connect()
{
InetSocketAddress address = new InetSocketAddress(host, port);
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS connecting to {}", address);
client.connect(address, new ServerToProxySessionListener(), new Promise<Session>()
{
@Override
public void succeeded(Session result)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS connected to {} with {}", address, result);
synchronized (lock)
{
proxyToServerSession = result;
}
iterate();
}
@Override
public void failed(Throwable x)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS connect failed to {}", address);
// TODO: drain the queue and fail the streams.
}
});
}
@Override
protected Action process() throws Throwable
{
Stream proxyToServerStream = null;
Session proxyToServerSession = null;
synchronized (lock)
{
for (Map.Entry<Stream, Deque<FrameInfo>> entry : frames.entrySet())
{
frameInfo = entry.getValue().poll();
if (frameInfo != null)
{
clientToProxyStream = entry.getKey();
proxyToServerStream = streams.get(clientToProxyStream);
proxyToServerSession = this.proxyToServerSession;
break;
}
}
}
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS processing {} for {} to {}", frameInfo, clientToProxyStream, proxyToServerStream);
if (frameInfo == null)
return Action.IDLE;
if (proxyToServerStream == null)
{
HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame;
HeadersFrame proxyToServerFrame = new HeadersFrame(clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream());
proxyToServerSession.newStream(proxyToServerFrame, new Promise<Stream>()
{
@Override
public void succeeded(Stream result)
{
synchronized (lock)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS created {}", result);
streams.put(clientToProxyStream, result);
}
serverToProxyToClient.link(result, clientToProxyStream);
ClientToProxyToServer.this.succeeded();
}
@Override
public void failed(Throwable failure)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS create failed", failure);
// TODO: cannot open stream to server.
ClientToProxyToServer.this.failed(failure);
}
}, serverToProxyToClient);
return Action.SCHEDULED;
}
else
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS forwarding {} from {} to {}", frameInfo, clientToProxyStream, proxyToServerStream);
switch (frameInfo.frame.getType())
{
case HEADERS:
{
HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame;
HeadersFrame proxyToServerFrame = new HeadersFrame(proxyToServerStream.getId(), clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream());
proxyToServerStream.headers(proxyToServerFrame, this);
return Action.SCHEDULED;
}
case DATA:
{
DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame;
DataFrame proxyToServerFrame = new DataFrame(proxyToServerStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream());
proxyToServerStream.data(proxyToServerFrame, this);
return Action.SCHEDULED;
}
default:
{
throw new IllegalStateException();
}
}
}
}
@Override
public void succeeded()
{
frameInfo.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
{
frameInfo.callback.failed(failure);
super.failed(failure);
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS received {} on {}", frame, stream);
offer(stream, frame, NOOP);
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
// Clients don't push.
return null;
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS received {} on {}", frame, stream);
offer(stream, frame, callback);
}
@Override
public void onReset(Stream stream, ResetFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS received {} on {}", frame, stream);
// TODO: drain the queue for that stream, and notify server.
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS idle timeout for {}", stream);
// TODO: drain the queue for that stream, reset stream, and notify server.
return true;
}
}
private static class ServerToProxySessionListener extends Session.Listener.Adapter
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Received {} on {}", frame, session);
// TODO
}
@Override
public boolean onIdleTimeout(Session session)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Idle timeout on {}", session);
// TODO
return true;
}
@Override
public void onFailure(Session session, Throwable failure)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Failure on " + session, failure);
// TODO
}
}
private static class ServerToProxyToClient extends IteratingCallback implements Stream.Listener
{
private final Object lock = this;
private final Map<Stream, Deque<FrameInfo>> frames = new HashMap<>();
private final Map<Stream, Stream> streams = new HashMap<>();
private FrameInfo frameInfo;
private Stream serverToProxyStream;
@Override
protected Action process() throws Throwable
{
Stream proxyToClientStream = null;
synchronized (lock)
{
for (Map.Entry<Stream, Deque<FrameInfo>> entry : frames.entrySet())
{
frameInfo = entry.getValue().poll();
if (frameInfo != null)
{
serverToProxyStream = entry.getKey();
proxyToClientStream = streams.get(serverToProxyStream);
break;
}
}
}
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC processing {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream);
// It may happen that we received a frame from the server,
// but the proxyToClientStream is not linked yet.
if (proxyToClientStream == null)
return Action.IDLE;
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC forwarding {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream);
switch (frameInfo.frame.getType())
{
case HEADERS:
{
HeadersFrame serverToProxyFrame = (HeadersFrame)frameInfo.frame;
HeadersFrame proxyToClientFrame = new HeadersFrame(proxyToClientStream.getId(), serverToProxyFrame.getMetaData(), serverToProxyFrame.getPriority(), serverToProxyFrame.isEndStream());
proxyToClientStream.headers(proxyToClientFrame, this);
return Action.SCHEDULED;
}
case DATA:
{
DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame;
DataFrame proxyToServerFrame = new DataFrame(serverToProxyStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream());
proxyToClientStream.data(proxyToServerFrame, this);
return Action.SCHEDULED;
}
case PUSH_PROMISE:
{
// TODO
throw new UnsupportedOperationException();
}
default:
{
throw new IllegalStateException();
}
}
}
@Override
public void succeeded()
{
frameInfo.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
{
frameInfo.callback.failed(failure);
super.failed(failure);
}
private void offer(Stream stream, Frame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC queueing {} for {} on {}", frame, stream, stream.getSession());
synchronized (lock)
{
Deque<FrameInfo> deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>());
deque.offer(new FrameInfo(frame, callback));
}
iterate();
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
offer(stream, frame, NOOP);
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
// TODO
return null;
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
offer(stream, frame, callback);
}
@Override
public void onReset(Stream stream, ResetFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
// TODO: drain queue, reset client stream.
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC idle timeout for {}", stream);
// TODO:
return false;
}
private void link(Stream proxyToServerStream, Stream clientToProxyStream)
{
synchronized (lock)
{
streams.put(proxyToServerStream, clientToProxyStream);
}
iterate();
}
}
private static class FrameInfo
{
private final Frame frame;
private final Callback callback;
private FrameInfo(Frame frame, Callback callback)
{
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return frame.toString();
}
}
}

View File

@ -0,0 +1,145 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class TrailersTest extends AbstractTest
{
@Test
public void testTrailersSentByClient() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
Assert.assertFalse(frame.isEndStream());
Assert.assertTrue(request.getFields().containsKey("X-Request"));
return new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData trailer = frame.getMetaData();
Assert.assertTrue(frame.isEndStream());
Assert.assertTrue(trailer.getFields().containsKey("X-Trailer"));
latch.countDown();
}
};
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields requestFields = new HttpFields();
requestFields.put("X-Request", "true");
MetaData.Request request = newRequest("GET", requestFields);
HeadersFrame requestFrame = new HeadersFrame(request, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Send the trailers.
HttpFields trailerFields = new HttpFields();
trailerFields.put("X-Trailer", "true");
MetaData trailers = new MetaData(HttpVersion.HTTP_2, trailerFields);
HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailers, null, true);
stream.headers(trailerFrame, Callback.NOOP);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testTrailersSentByServer() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
HttpFields responseFields = new HttpFields();
responseFields.put("X-Response", "true");
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseFields);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
stream.headers(responseFrame, new Callback()
{
@Override
public void succeeded()
{
HttpFields trailerFields = new HttpFields();
trailerFields.put("X-Trailer", "true");
MetaData trailer = new MetaData(HttpVersion.HTTP_2, trailerFields);
HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailer, null, true);
stream.headers(trailerFrame, NOOP);
}
});
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(request, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
private boolean responded;
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (!responded)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(response.getFields().containsKey("X-Response"));
Assert.assertFalse(frame.isEndStream());
responded = true;
}
else
{
MetaData trailer = frame.getMetaData();
Assert.assertTrue(trailer.getFields().containsKey("X-Trailer"));
Assert.assertTrue(frame.isEndStream());
latch.countDown();
}
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -1097,6 +1097,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void notifyHeaders(IStream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
if (listener == null)
return;
try
{
listener.onHeaders(stream, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
@Override
public String toString()
{

View File

@ -95,9 +95,25 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
stream.setListener(listener);
}
}
else if (metaData.isResponse())
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_request");
}
else
{
onConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "invalid_request");
// Trailers.
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream != null)
{
stream.process(frame, Callback.NOOP);
notifyHeaders(stream, frame);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
}
}
}