From 44c84ffb0928f9642b21f846f46fd618c27e6ba8 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 7 Dec 2016 22:28:01 +0100 Subject: [PATCH 1/2] Fixes #1151 - NPE in ClasspathPattern.match(). --- .../jetty/webapp/ClasspathPattern.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java b/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java index 32ed9526aad..9a403210bd5 100644 --- a/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java +++ b/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/ClasspathPattern.java @@ -19,9 +19,6 @@ package org.eclipse.jetty.webapp; -import static java.lang.Boolean.FALSE; -import static java.lang.Boolean.TRUE; - import java.io.File; import java.net.URL; import java.nio.file.Path; @@ -33,7 +30,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Predicate; import org.eclipse.jetty.util.ArrayTernaryTrie; @@ -44,7 +40,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.resource.Resource; -/* ------------------------------------------------------------ */ /** * Classpath classes list performs sequential pattern matching of a class name * against an internal array of classpath pattern entries. @@ -156,7 +151,7 @@ public class ClasspathPattern extends AbstractSet @Override public Iterator iterator() { - return _entries.keySet().stream().map(k->_entries.get(k)).iterator(); + return _entries.keySet().stream().map(_entries::get).iterator(); } @Override @@ -335,8 +330,6 @@ public class ClasspathPattern extends AbstractSet Map _entries = new HashMap<>(); - Set _classes = new HashSet<>(); - IncludeExcludeSet _patterns = new IncludeExcludeSet<>(ByPackageOrName.class); IncludeExcludeSet _locations = new IncludeExcludeSet<>(ByLocation.class); @@ -522,16 +515,16 @@ public class ClasspathPattern extends AbstractSet { try { - Resource resource = TypeUtil.getLoadedFrom(clazz); - Path path = resource.getFile().toPath(); - Boolean byName = _patterns.isIncludedAndNotExcluded(clazz.getName()); - Boolean byLocation = _locations.isIncludedAndNotExcluded(path); - + Resource resource = TypeUtil.getLoadedFrom(clazz); + Boolean byLocation = resource == null || resource.getFile() == null + ? null + : _locations.isIncludedAndNotExcluded(resource.getFile().toPath()); + // Combine the tri-state match of both IncludeExclude Sets - boolean included = byName==TRUE || byLocation==TRUE + boolean included = byName==Boolean.TRUE || byLocation==Boolean.TRUE || (byName==null && !_patterns.hasIncludes() && byLocation==null && !_locations.hasIncludes()); - boolean excluded = byName==FALSE || byLocation==FALSE; + boolean excluded = byName==Boolean.FALSE || byLocation==Boolean.FALSE; return included && !excluded; } catch (Exception e) @@ -566,9 +559,9 @@ public class ClasspathPattern extends AbstractSet } // Combine the tri-state match of both IncludeExclude Sets - boolean included = byName==TRUE || byLocation==TRUE + boolean included = byName==Boolean.TRUE || byLocation==Boolean.TRUE || (byName==null && !_patterns.hasIncludes() && byLocation==null && !_locations.hasIncludes()); - boolean excluded = byName==FALSE || byLocation==FALSE; + boolean excluded = byName==Boolean.FALSE || byLocation==Boolean.FALSE; return included && !excluded; } } From a721e8b25d8c8667c8fc3f51527308d34a1d0474 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 8 Dec 2016 10:08:56 +0100 Subject: [PATCH 2/2] Fixes #1148 - Support HTTP/2 HEADERS trailer. --- .../http2/client/HTTP2ClientSession.java | 35 +- .../jetty/http2/client/RawHTTP2ProxyTest.java | 709 ++++++++++++++++++ .../jetty/http2/client/TrailersTest.java | 145 ++++ .../org/eclipse/jetty/http2/HTTP2Session.java | 15 + .../http2/server/HTTP2ServerSession.java | 18 +- 5 files changed, 901 insertions(+), 21 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index 295fdf56aaf..d5c509018c2 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client; import java.util.concurrent.atomic.AtomicLong; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.IStream; @@ -78,30 +80,23 @@ public class HTTP2ClientSession extends HTTP2Session int streamId = frame.getStreamId(); IStream stream = getStream(streamId); - if (stream == null) + if (stream != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Ignoring {}, stream #{} not found", frame, streamId); + MetaData metaData = frame.getMetaData(); + if (metaData.isRequest()) + { + onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_response"); + } + else + { + stream.process(frame, Callback.NOOP); + notifyHeaders(stream, frame); + } } else { - stream.process(frame, Callback.NOOP); - notifyHeaders(stream, frame); - } - } - - private void notifyHeaders(IStream stream, HeadersFrame frame) - { - Stream.Listener listener = stream.getListener(); - if (listener == null) - return; - try - { - listener.onHeaders(stream, frame); - } - catch (Throwable x) - { - LOG.info("Failure while notifying listener " + listener, x); + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring {}, stream #{} not found", frame, streamId); } } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java new file mode 100644 index 00000000000..9896d7355ef --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/RawHTTP2ProxyTest.java @@ -0,0 +1,709 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.GoAwayFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PushPromiseFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class RawHTTP2ProxyTest +{ + private static final Logger LOGGER = Log.getLogger(RawHTTP2ProxyTest.class); + + private final List servers = new ArrayList<>(); + private final List clients = new ArrayList<>(); + + private Server startServer(String name, ServerSessionListener listener) throws Exception + { + QueuedThreadPool serverExecutor = new QueuedThreadPool(); + serverExecutor.setName(name); + Server server = new Server(serverExecutor); + RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener); + ServerConnector connector = new ServerConnector(server, 1, 1, connectionFactory); + server.addConnector(connector); + server.setAttribute("connector", connector); + servers.add(server); + server.start(); + return server; + } + + private HTTP2Client startClient(String name) throws Exception + { + HTTP2Client client = new HTTP2Client(); + QueuedThreadPool clientExecutor = new QueuedThreadPool(); + clientExecutor.setName(name); + client.setExecutor(clientExecutor); + clients.add(client); + client.start(); + return client; + } + + @After + public void dispose() throws Exception + { + for (int i = clients.size() - 1; i >= 0; i--) + { + HTTP2Client client = clients.get(i); + client.stop(); + } + for (int i = servers.size() - 1; i >= 0; i--) + { + Server server = servers.get(i); + server.stop(); + } + } + + + @Test + public void testRawHTTP2Proxy() throws Exception + { + byte[] data1 = new byte[1024]; + new Random().nextBytes(data1); + ByteBuffer buffer1 = ByteBuffer.wrap(data1); + Server server1 = startServer("server1", new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 received {}", frame); + return new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 received {}", frame); + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 sending {}", reply); + stream.headers(reply, new Callback() + { + @Override + public void succeeded() + { + DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), true); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER1 sending {}", data); + stream.data(data, NOOP); + } + }); + } + } + }; + } + }); + ServerConnector connector1 = (ServerConnector)server1.getAttribute("connector"); + Server server2 = startServer("server2", new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 received {}", frame); + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 received {}", frame); + callback.succeeded(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + Callback.Completable completable1 = new Callback.Completable(); + HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 sending {}", reply); + stream.headers(reply, completable1); + completable1.thenCompose(ignored -> + { + Callback.Completable completable2 = new Callback.Completable(); + DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), false); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 sending {}", data); + stream.data(data, completable2); + return completable2; + }).thenRun(() -> + { + MetaData trailer = new MetaData(HttpVersion.HTTP_2, new HttpFields()); + HeadersFrame end = new HeadersFrame(stream.getId(), trailer, null, true); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SERVER2 sending {}", end); + stream.headers(end, Callback.NOOP); + }); + } + }; + } + }); + ServerConnector connector2 = (ServerConnector)server2.getAttribute("connector"); + HTTP2Client proxyClient = startClient("proxyClient"); + Server proxyServer = startServer("proxyServer", new ClientToProxySessionListener(proxyClient)); + ServerConnector proxyConnector = (ServerConnector)proxyServer.getAttribute("connector"); + InetSocketAddress proxyAddress = new InetSocketAddress("localhost", proxyConnector.getLocalPort()); + HTTP2Client client = startClient("client"); + + FuturePromise clientPromise = new FuturePromise<>(); + client.connect(proxyAddress, new Session.Listener.Adapter(), clientPromise); + Session clientSession = clientPromise.get(5, TimeUnit.SECONDS); + + // Send a request with trailers for server1. + HttpFields fields1 = new HttpFields(); + fields1.put("X-Target", String.valueOf(connector1.getLocalPort())); + MetaData.Request request1 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields1); + FuturePromise streamPromise1 = new FuturePromise<>(); + CountDownLatch latch1 = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request1, null, false), streamPromise1, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + Assert.assertEquals(buffer1.slice(), frame.getData()); + callback.succeeded(); + latch1.countDown(); + } + }); + Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS); + stream1.headers(new HeadersFrame(stream1.getId(), new MetaData(HttpVersion.HTTP_2, new HttpFields()), null, true), Callback.NOOP); + + // Send a request for server2. + HttpFields fields2 = new HttpFields(); + fields2.put("X-Target", String.valueOf(connector2.getLocalPort())); + MetaData.Request request2 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields2); + FuturePromise streamPromise2 = new FuturePromise<>(); + CountDownLatch latch2 = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request2, null, false), streamPromise2, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + if (frame.isEndStream()) + latch2.countDown(); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CLIENT received {}", frame); + callback.succeeded(); + } + }); + Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS); + stream2.data(new DataFrame(stream2.getId(), buffer1.slice(), true), Callback.NOOP); + + Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); + } + + private static class ClientToProxySessionListener extends ServerSessionListener.Adapter + { + private final Map forwarders = new ConcurrentHashMap<>(); + private final HTTP2Client client; + + private ClientToProxySessionListener(HTTP2Client client) + { + this.client = client; + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Received {} for {} on {}: {}", frame, stream, stream.getSession(), frame.getMetaData()); + // Forward to the right server. + MetaData metaData = frame.getMetaData(); + HttpFields fields = metaData.getFields(); + int port = Integer.parseInt(fields.get("X-Target")); + ClientToProxyToServer clientToProxyToServer = forwarders.computeIfAbsent(port, p -> new ClientToProxyToServer("localhost", p, client)); + clientToProxyToServer.offer(stream, frame, Callback.NOOP); + return clientToProxyToServer; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Received {} on {}", frame, session); + // TODO + } + + @Override + public boolean onIdleTimeout(Session session) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Idle timeout on {}", session); + // TODO + return true; + } + + @Override + public void onFailure(Session session, Throwable failure) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Failure on " + session, failure); + // TODO + } + } + + private static class ClientToProxyToServer extends IteratingCallback implements Stream.Listener + { + private final Object lock = this; + private final Map> frames = new HashMap<>(); + private final Map streams = new HashMap<>(); + private final ServerToProxyToClient serverToProxyToClient = new ServerToProxyToClient(); + private final String host; + private final int port; + private final HTTP2Client client; + private Session proxyToServerSession; + private FrameInfo frameInfo; + private Stream clientToProxyStream; + + private ClientToProxyToServer(String host, int port, HTTP2Client client) + { + this.host = host; + this.port = port; + this.client = client; + } + + private void offer(Stream stream, Frame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS queueing {} for {} on {}", frame, stream, stream.getSession()); + boolean connected; + synchronized (lock) + { + Deque deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>()); + deque.offer(new FrameInfo(frame, callback)); + connected = proxyToServerSession != null; + } + if (connected) + iterate(); + else + connect(); + } + + private void connect() + { + InetSocketAddress address = new InetSocketAddress(host, port); + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS connecting to {}", address); + client.connect(address, new ServerToProxySessionListener(), new Promise() + { + @Override + public void succeeded(Session result) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS connected to {} with {}", address, result); + synchronized (lock) + { + proxyToServerSession = result; + } + iterate(); + } + + @Override + public void failed(Throwable x) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS connect failed to {}", address); + // TODO: drain the queue and fail the streams. + } + }); + } + + @Override + protected Action process() throws Throwable + { + Stream proxyToServerStream = null; + Session proxyToServerSession = null; + synchronized (lock) + { + for (Map.Entry> entry : frames.entrySet()) + { + frameInfo = entry.getValue().poll(); + if (frameInfo != null) + { + clientToProxyStream = entry.getKey(); + proxyToServerStream = streams.get(clientToProxyStream); + proxyToServerSession = this.proxyToServerSession; + break; + } + } + } + + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS processing {} for {} to {}", frameInfo, clientToProxyStream, proxyToServerStream); + + if (frameInfo == null) + return Action.IDLE; + + if (proxyToServerStream == null) + { + HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame; + HeadersFrame proxyToServerFrame = new HeadersFrame(clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream()); + proxyToServerSession.newStream(proxyToServerFrame, new Promise() + { + @Override + public void succeeded(Stream result) + { + synchronized (lock) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS created {}", result); + streams.put(clientToProxyStream, result); + } + serverToProxyToClient.link(result, clientToProxyStream); + ClientToProxyToServer.this.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS create failed", failure); + // TODO: cannot open stream to server. + ClientToProxyToServer.this.failed(failure); + } + }, serverToProxyToClient); + return Action.SCHEDULED; + } + else + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS forwarding {} from {} to {}", frameInfo, clientToProxyStream, proxyToServerStream); + switch (frameInfo.frame.getType()) + { + case HEADERS: + { + HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame; + HeadersFrame proxyToServerFrame = new HeadersFrame(proxyToServerStream.getId(), clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream()); + proxyToServerStream.headers(proxyToServerFrame, this); + return Action.SCHEDULED; + } + case DATA: + { + DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame; + DataFrame proxyToServerFrame = new DataFrame(proxyToServerStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream()); + proxyToServerStream.data(proxyToServerFrame, this); + return Action.SCHEDULED; + } + default: + { + throw new IllegalStateException(); + } + } + } + } + + @Override + public void succeeded() + { + frameInfo.callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + frameInfo.callback.failed(failure); + super.failed(failure); + } + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS received {} on {}", frame, stream); + offer(stream, frame, NOOP); + } + + @Override + public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) + { + // Clients don't push. + return null; + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS received {} on {}", frame, stream); + offer(stream, frame, callback); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS received {} on {}", frame, stream); + // TODO: drain the queue for that stream, and notify server. + } + + @Override + public boolean onIdleTimeout(Stream stream, Throwable x) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("CPS idle timeout for {}", stream); + // TODO: drain the queue for that stream, reset stream, and notify server. + return true; + } + } + + private static class ServerToProxySessionListener extends Session.Listener.Adapter + { + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Received {} on {}", frame, session); + // TODO + } + + @Override + public boolean onIdleTimeout(Session session) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Idle timeout on {}", session); + // TODO + return true; + } + + @Override + public void onFailure(Session session, Throwable failure) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Failure on " + session, failure); + // TODO + } + } + + private static class ServerToProxyToClient extends IteratingCallback implements Stream.Listener + { + private final Object lock = this; + private final Map> frames = new HashMap<>(); + private final Map streams = new HashMap<>(); + private FrameInfo frameInfo; + private Stream serverToProxyStream; + + @Override + protected Action process() throws Throwable + { + Stream proxyToClientStream = null; + synchronized (lock) + { + for (Map.Entry> entry : frames.entrySet()) + { + frameInfo = entry.getValue().poll(); + if (frameInfo != null) + { + serverToProxyStream = entry.getKey(); + proxyToClientStream = streams.get(serverToProxyStream); + break; + } + } + } + + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC processing {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream); + + // It may happen that we received a frame from the server, + // but the proxyToClientStream is not linked yet. + if (proxyToClientStream == null) + return Action.IDLE; + + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC forwarding {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream); + + switch (frameInfo.frame.getType()) + { + case HEADERS: + { + HeadersFrame serverToProxyFrame = (HeadersFrame)frameInfo.frame; + HeadersFrame proxyToClientFrame = new HeadersFrame(proxyToClientStream.getId(), serverToProxyFrame.getMetaData(), serverToProxyFrame.getPriority(), serverToProxyFrame.isEndStream()); + proxyToClientStream.headers(proxyToClientFrame, this); + return Action.SCHEDULED; + } + case DATA: + { + DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame; + DataFrame proxyToServerFrame = new DataFrame(serverToProxyStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream()); + proxyToClientStream.data(proxyToServerFrame, this); + return Action.SCHEDULED; + } + case PUSH_PROMISE: + { + // TODO + throw new UnsupportedOperationException(); + } + default: + { + throw new IllegalStateException(); + } + } + } + + @Override + public void succeeded() + { + frameInfo.callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + frameInfo.callback.failed(failure); + super.failed(failure); + } + + private void offer(Stream stream, Frame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC queueing {} for {} on {}", frame, stream, stream.getSession()); + synchronized (lock) + { + Deque deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>()); + deque.offer(new FrameInfo(frame, callback)); + } + iterate(); + } + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + offer(stream, frame, NOOP); + } + + @Override + public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + // TODO + return null; + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + offer(stream, frame, callback); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC received {} on {}", frame, stream); + // TODO: drain queue, reset client stream. + } + + @Override + public boolean onIdleTimeout(Stream stream, Throwable x) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("SPC idle timeout for {}", stream); + // TODO: + return false; + } + + private void link(Stream proxyToServerStream, Stream clientToProxyStream) + { + synchronized (lock) + { + streams.put(proxyToServerStream, clientToProxyStream); + } + iterate(); + } + } + + private static class FrameInfo + { + private final Frame frame; + private final Callback callback; + + private FrameInfo(Frame frame, Callback callback) + { + this.frame = frame; + this.callback = callback; + } + + @Override + public String toString() + { + return frame.toString(); + } + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java new file mode 100644 index 00000000000..de303464183 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/TrailersTest.java @@ -0,0 +1,145 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.junit.Assert; +import org.junit.Test; + +public class TrailersTest extends AbstractTest +{ + @Test + public void testTrailersSentByClient() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + Assert.assertFalse(frame.isEndStream()); + Assert.assertTrue(request.getFields().containsKey("X-Request")); + return new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData trailer = frame.getMetaData(); + Assert.assertTrue(frame.isEndStream()); + Assert.assertTrue(trailer.getFields().containsKey("X-Trailer")); + latch.countDown(); + } + }; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + + HttpFields requestFields = new HttpFields(); + requestFields.put("X-Request", "true"); + MetaData.Request request = newRequest("GET", requestFields); + HeadersFrame requestFrame = new HeadersFrame(request, null, false); + FuturePromise streamPromise = new FuturePromise<>(); + session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter()); + Stream stream = streamPromise.get(5, TimeUnit.SECONDS); + + // Send the trailers. + HttpFields trailerFields = new HttpFields(); + trailerFields.put("X-Trailer", "true"); + MetaData trailers = new MetaData(HttpVersion.HTTP_2, trailerFields); + HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailers, null, true); + stream.headers(trailerFrame, Callback.NOOP); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testTrailersSentByServer() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + HttpFields responseFields = new HttpFields(); + responseFields.put("X-Response", "true"); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseFields); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false); + stream.headers(responseFrame, new Callback() + { + @Override + public void succeeded() + { + HttpFields trailerFields = new HttpFields(); + trailerFields.put("X-Trailer", "true"); + MetaData trailer = new MetaData(HttpVersion.HTTP_2, trailerFields); + HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailer, null, true); + stream.headers(trailerFrame, NOOP); + } + }); + return null; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame requestFrame = new HeadersFrame(request, null, true); + CountDownLatch latch = new CountDownLatch(1); + session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + private boolean responded; + + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (!responded) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + Assert.assertTrue(response.getFields().containsKey("X-Response")); + Assert.assertFalse(frame.isEndStream()); + responded = true; + } + else + { + MetaData trailer = frame.getMetaData(); + Assert.assertTrue(trailer.getFields().containsKey("X-Trailer")); + Assert.assertTrue(frame.isEndStream()); + latch.countDown(); + } + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index be26e1a5bb9..b65b02ba3dc 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -1097,6 +1097,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + protected void notifyHeaders(IStream stream, HeadersFrame frame) + { + Stream.Listener listener = stream.getListener(); + if (listener == null) + return; + try + { + listener.onHeaders(stream, frame); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + @Override public String toString() { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index cb5de81d6c2..1ef23a1e0ce 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -95,9 +95,25 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis stream.setListener(listener); } } + else if (metaData.isResponse()) + { + onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_request"); + } else { - onConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "invalid_request"); + // Trailers. + int streamId = frame.getStreamId(); + IStream stream = getStream(streamId); + if (stream != null) + { + stream.process(frame, Callback.NOOP); + notifyHeaders(stream, frame); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Ignoring {}, stream #{} not found", frame, streamId); + } } }