Merge remote-tracking branch 'origin/jetty-9.4.x'

This commit is contained in:
Jan Bartel 2016-12-08 20:17:20 +11:00
commit 8ea15f4214
15 changed files with 1042 additions and 92 deletions

View File

@ -22,9 +22,9 @@ This document provides information about configuring Java Server Pages (JSP) for
[[which-jsp-implementation]]
==== Which JSP Implementation
As of Jetty 9.2, Jetty is using Jasper from http://tomcat.apache.org/tomcat-8.0-doc/jasper-howto.html[Apache] as the default JSP container implementation.
Jetty uses Jasper from http://tomcat.apache.org/tomcat-8.0-doc/jasper-howto.html[Apache] as the default JSP container implementation.
By default the Jetty distribution enables the JSP link:#startup-modules[module], and by default, this link:#startup-modules[module] is set to Apache Jasper.
By default the Jetty distribution enables the JSP link:#startup-modules[module], and by default, this module is set to Apache Jasper.
[source, plain, subs="{sub-order}"]
----
@ -213,7 +213,7 @@ If you are using the Jetty distribution, and you want to change the JSP settings
If you want to change the JSP settings for all webapps, edit the `{$jetty.home}/etc/webdefaults.xml` file directly instead.
[[configuring-jsp-servlet-in-web.xml]]
===== Configuring the JSP Servlet in `web.xml`
===== Configuring the JSP Servlet in web.xml
Another option is to add an entry for the JSPServlet to the `WEB-INF/web.xml` file of your webapp and change or add init-params.
You may also add (but not remove) servlet-mappings.
@ -259,11 +259,25 @@ You can use the entry in link:#webdefault-xml[{$jetty.home}/etc/webdefault.xml]
...
----
[[jsp-async-support]]
===== Configuring Async Support
By default, Jetty does not enable async support for the JSP servlet.
Configuring the JSP servlet for async is relatively easy - simply define the `async-supported` parameter as `true` in either your `webdefault.xml` or the `web.xml` for a specific context.
[source, xml, subs="{sub-order}"]
----
<servlet id="jsp">
<servlet-name>jsp</servlet-name>
<async-supported>true</async-supported>
</servlet>
----
[[using-jstl-taglibs-for-jetty7-jetty8]]
==== Using JSTL Taglibs
The JavaServer Pages Standlard Tag Library (JSTL) is part of the Jetty distribution and is automatically put on the classpath when you link:#which-jsp-implementation[select your flavour of JSP].
It is also automatically on the classpath for the Jetty Maven plugin, which uses the Apache JSP engine as of Jetty 9.2.
It is also automatically on the classpath for the Jetty Maven plugin, which uses the Apache JSP engine.
===== Embedding

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.client;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
@ -78,30 +80,23 @@ public class HTTP2ClientSession extends HTTP2Session
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream == null)
if (stream != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_response");
}
else
{
stream.process(frame, Callback.NOOP);
notifyHeaders(stream, frame);
}
}
else
{
stream.process(frame, Callback.NOOP);
notifyHeaders(stream, frame);
}
}
private void notifyHeaders(IStream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
if (listener == null)
return;
try
{
listener.onHeaders(stream, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
}
}

View File

@ -0,0 +1,709 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class RawHTTP2ProxyTest
{
private static final Logger LOGGER = Log.getLogger(RawHTTP2ProxyTest.class);
private final List<Server> servers = new ArrayList<>();
private final List<HTTP2Client> clients = new ArrayList<>();
private Server startServer(String name, ServerSessionListener listener) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName(name);
Server server = new Server(serverExecutor);
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
ServerConnector connector = new ServerConnector(server, 1, 1, connectionFactory);
server.addConnector(connector);
server.setAttribute("connector", connector);
servers.add(server);
server.start();
return server;
}
private HTTP2Client startClient(String name) throws Exception
{
HTTP2Client client = new HTTP2Client();
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName(name);
client.setExecutor(clientExecutor);
clients.add(client);
client.start();
return client;
}
@After
public void dispose() throws Exception
{
for (int i = clients.size() - 1; i >= 0; i--)
{
HTTP2Client client = clients.get(i);
client.stop();
}
for (int i = servers.size() - 1; i >= 0; i--)
{
Server server = servers.get(i);
server.stop();
}
}
@Test
public void testRawHTTP2Proxy() throws Exception
{
byte[] data1 = new byte[1024];
new Random().nextBytes(data1);
ByteBuffer buffer1 = ByteBuffer.wrap(data1);
Server server1 = startServer("server1", new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 received {}", frame);
return new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 received {}", frame);
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 sending {}", reply);
stream.headers(reply, new Callback()
{
@Override
public void succeeded()
{
DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), true);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER1 sending {}", data);
stream.data(data, NOOP);
}
});
}
}
};
}
});
ServerConnector connector1 = (ServerConnector)server1.getAttribute("connector");
Server server2 = startServer("server2", new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 received {}", frame);
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 received {}", frame);
callback.succeeded();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
Callback.Completable completable1 = new Callback.Completable();
HeadersFrame reply = new HeadersFrame(stream.getId(), response, null, false);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 sending {}", reply);
stream.headers(reply, completable1);
completable1.thenCompose(ignored ->
{
Callback.Completable completable2 = new Callback.Completable();
DataFrame data = new DataFrame(stream.getId(), buffer1.slice(), false);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 sending {}", data);
stream.data(data, completable2);
return completable2;
}).thenRun(() ->
{
MetaData trailer = new MetaData(HttpVersion.HTTP_2, new HttpFields());
HeadersFrame end = new HeadersFrame(stream.getId(), trailer, null, true);
if (LOGGER.isDebugEnabled())
LOGGER.debug("SERVER2 sending {}", end);
stream.headers(end, Callback.NOOP);
});
}
};
}
});
ServerConnector connector2 = (ServerConnector)server2.getAttribute("connector");
HTTP2Client proxyClient = startClient("proxyClient");
Server proxyServer = startServer("proxyServer", new ClientToProxySessionListener(proxyClient));
ServerConnector proxyConnector = (ServerConnector)proxyServer.getAttribute("connector");
InetSocketAddress proxyAddress = new InetSocketAddress("localhost", proxyConnector.getLocalPort());
HTTP2Client client = startClient("client");
FuturePromise<Session> clientPromise = new FuturePromise<>();
client.connect(proxyAddress, new Session.Listener.Adapter(), clientPromise);
Session clientSession = clientPromise.get(5, TimeUnit.SECONDS);
// Send a request with trailers for server1.
HttpFields fields1 = new HttpFields();
fields1.put("X-Target", String.valueOf(connector1.getLocalPort()));
MetaData.Request request1 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields1);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
CountDownLatch latch1 = new CountDownLatch(1);
clientSession.newStream(new HeadersFrame(request1, null, false), streamPromise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
Assert.assertEquals(buffer1.slice(), frame.getData());
callback.succeeded();
latch1.countDown();
}
});
Stream stream1 = streamPromise1.get(5, TimeUnit.SECONDS);
stream1.headers(new HeadersFrame(stream1.getId(), new MetaData(HttpVersion.HTTP_2, new HttpFields()), null, true), Callback.NOOP);
// Send a request for server2.
HttpFields fields2 = new HttpFields();
fields2.put("X-Target", String.valueOf(connector2.getLocalPort()));
MetaData.Request request2 = new MetaData.Request("GET", new HttpURI("http://localhost/server1"), HttpVersion.HTTP_2, fields2);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
CountDownLatch latch2 = new CountDownLatch(1);
clientSession.newStream(new HeadersFrame(request2, null, false), streamPromise2, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
if (frame.isEndStream())
latch2.countDown();
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT received {}", frame);
callback.succeeded();
}
});
Stream stream2 = streamPromise2.get(5, TimeUnit.SECONDS);
stream2.data(new DataFrame(stream2.getId(), buffer1.slice(), true), Callback.NOOP);
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
}
private static class ClientToProxySessionListener extends ServerSessionListener.Adapter
{
private final Map<Integer, ClientToProxyToServer> forwarders = new ConcurrentHashMap<>();
private final HTTP2Client client;
private ClientToProxySessionListener(HTTP2Client client)
{
this.client = client;
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Received {} for {} on {}: {}", frame, stream, stream.getSession(), frame.getMetaData());
// Forward to the right server.
MetaData metaData = frame.getMetaData();
HttpFields fields = metaData.getFields();
int port = Integer.parseInt(fields.get("X-Target"));
ClientToProxyToServer clientToProxyToServer = forwarders.computeIfAbsent(port, p -> new ClientToProxyToServer("localhost", p, client));
clientToProxyToServer.offer(stream, frame, Callback.NOOP);
return clientToProxyToServer;
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Received {} on {}", frame, session);
// TODO
}
@Override
public boolean onIdleTimeout(Session session)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Idle timeout on {}", session);
// TODO
return true;
}
@Override
public void onFailure(Session session, Throwable failure)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Failure on " + session, failure);
// TODO
}
}
private static class ClientToProxyToServer extends IteratingCallback implements Stream.Listener
{
private final Object lock = this;
private final Map<Stream, Deque<FrameInfo>> frames = new HashMap<>();
private final Map<Stream, Stream> streams = new HashMap<>();
private final ServerToProxyToClient serverToProxyToClient = new ServerToProxyToClient();
private final String host;
private final int port;
private final HTTP2Client client;
private Session proxyToServerSession;
private FrameInfo frameInfo;
private Stream clientToProxyStream;
private ClientToProxyToServer(String host, int port, HTTP2Client client)
{
this.host = host;
this.port = port;
this.client = client;
}
private void offer(Stream stream, Frame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS queueing {} for {} on {}", frame, stream, stream.getSession());
boolean connected;
synchronized (lock)
{
Deque<FrameInfo> deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>());
deque.offer(new FrameInfo(frame, callback));
connected = proxyToServerSession != null;
}
if (connected)
iterate();
else
connect();
}
private void connect()
{
InetSocketAddress address = new InetSocketAddress(host, port);
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS connecting to {}", address);
client.connect(address, new ServerToProxySessionListener(), new Promise<Session>()
{
@Override
public void succeeded(Session result)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS connected to {} with {}", address, result);
synchronized (lock)
{
proxyToServerSession = result;
}
iterate();
}
@Override
public void failed(Throwable x)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS connect failed to {}", address);
// TODO: drain the queue and fail the streams.
}
});
}
@Override
protected Action process() throws Throwable
{
Stream proxyToServerStream = null;
Session proxyToServerSession = null;
synchronized (lock)
{
for (Map.Entry<Stream, Deque<FrameInfo>> entry : frames.entrySet())
{
frameInfo = entry.getValue().poll();
if (frameInfo != null)
{
clientToProxyStream = entry.getKey();
proxyToServerStream = streams.get(clientToProxyStream);
proxyToServerSession = this.proxyToServerSession;
break;
}
}
}
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS processing {} for {} to {}", frameInfo, clientToProxyStream, proxyToServerStream);
if (frameInfo == null)
return Action.IDLE;
if (proxyToServerStream == null)
{
HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame;
HeadersFrame proxyToServerFrame = new HeadersFrame(clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream());
proxyToServerSession.newStream(proxyToServerFrame, new Promise<Stream>()
{
@Override
public void succeeded(Stream result)
{
synchronized (lock)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS created {}", result);
streams.put(clientToProxyStream, result);
}
serverToProxyToClient.link(result, clientToProxyStream);
ClientToProxyToServer.this.succeeded();
}
@Override
public void failed(Throwable failure)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS create failed", failure);
// TODO: cannot open stream to server.
ClientToProxyToServer.this.failed(failure);
}
}, serverToProxyToClient);
return Action.SCHEDULED;
}
else
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS forwarding {} from {} to {}", frameInfo, clientToProxyStream, proxyToServerStream);
switch (frameInfo.frame.getType())
{
case HEADERS:
{
HeadersFrame clientToProxyFrame = (HeadersFrame)frameInfo.frame;
HeadersFrame proxyToServerFrame = new HeadersFrame(proxyToServerStream.getId(), clientToProxyFrame.getMetaData(), clientToProxyFrame.getPriority(), clientToProxyFrame.isEndStream());
proxyToServerStream.headers(proxyToServerFrame, this);
return Action.SCHEDULED;
}
case DATA:
{
DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame;
DataFrame proxyToServerFrame = new DataFrame(proxyToServerStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream());
proxyToServerStream.data(proxyToServerFrame, this);
return Action.SCHEDULED;
}
default:
{
throw new IllegalStateException();
}
}
}
}
@Override
public void succeeded()
{
frameInfo.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
{
frameInfo.callback.failed(failure);
super.failed(failure);
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS received {} on {}", frame, stream);
offer(stream, frame, NOOP);
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
// Clients don't push.
return null;
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS received {} on {}", frame, stream);
offer(stream, frame, callback);
}
@Override
public void onReset(Stream stream, ResetFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS received {} on {}", frame, stream);
// TODO: drain the queue for that stream, and notify server.
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("CPS idle timeout for {}", stream);
// TODO: drain the queue for that stream, reset stream, and notify server.
return true;
}
}
private static class ServerToProxySessionListener extends Session.Listener.Adapter
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Received {} on {}", frame, session);
// TODO
}
@Override
public boolean onIdleTimeout(Session session)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Idle timeout on {}", session);
// TODO
return true;
}
@Override
public void onFailure(Session session, Throwable failure)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Failure on " + session, failure);
// TODO
}
}
private static class ServerToProxyToClient extends IteratingCallback implements Stream.Listener
{
private final Object lock = this;
private final Map<Stream, Deque<FrameInfo>> frames = new HashMap<>();
private final Map<Stream, Stream> streams = new HashMap<>();
private FrameInfo frameInfo;
private Stream serverToProxyStream;
@Override
protected Action process() throws Throwable
{
Stream proxyToClientStream = null;
synchronized (lock)
{
for (Map.Entry<Stream, Deque<FrameInfo>> entry : frames.entrySet())
{
frameInfo = entry.getValue().poll();
if (frameInfo != null)
{
serverToProxyStream = entry.getKey();
proxyToClientStream = streams.get(serverToProxyStream);
break;
}
}
}
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC processing {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream);
// It may happen that we received a frame from the server,
// but the proxyToClientStream is not linked yet.
if (proxyToClientStream == null)
return Action.IDLE;
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC forwarding {} for {} to {}", frameInfo, serverToProxyStream, proxyToClientStream);
switch (frameInfo.frame.getType())
{
case HEADERS:
{
HeadersFrame serverToProxyFrame = (HeadersFrame)frameInfo.frame;
HeadersFrame proxyToClientFrame = new HeadersFrame(proxyToClientStream.getId(), serverToProxyFrame.getMetaData(), serverToProxyFrame.getPriority(), serverToProxyFrame.isEndStream());
proxyToClientStream.headers(proxyToClientFrame, this);
return Action.SCHEDULED;
}
case DATA:
{
DataFrame clientToProxyFrame = (DataFrame)frameInfo.frame;
DataFrame proxyToServerFrame = new DataFrame(serverToProxyStream.getId(), clientToProxyFrame.getData(), clientToProxyFrame.isEndStream());
proxyToClientStream.data(proxyToServerFrame, this);
return Action.SCHEDULED;
}
case PUSH_PROMISE:
{
// TODO
throw new UnsupportedOperationException();
}
default:
{
throw new IllegalStateException();
}
}
}
@Override
public void succeeded()
{
frameInfo.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
{
frameInfo.callback.failed(failure);
super.failed(failure);
}
private void offer(Stream stream, Frame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC queueing {} for {} on {}", frame, stream, stream.getSession());
synchronized (lock)
{
Deque<FrameInfo> deque = frames.computeIfAbsent(stream, s -> new ArrayDeque<>());
deque.offer(new FrameInfo(frame, callback));
}
iterate();
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
offer(stream, frame, NOOP);
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
// TODO
return null;
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
offer(stream, frame, callback);
}
@Override
public void onReset(Stream stream, ResetFrame frame)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC received {} on {}", frame, stream);
// TODO: drain queue, reset client stream.
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("SPC idle timeout for {}", stream);
// TODO:
return false;
}
private void link(Stream proxyToServerStream, Stream clientToProxyStream)
{
synchronized (lock)
{
streams.put(proxyToServerStream, clientToProxyStream);
}
iterate();
}
}
private static class FrameInfo
{
private final Frame frame;
private final Callback callback;
private FrameInfo(Frame frame, Callback callback)
{
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return frame.toString();
}
}
}

View File

@ -0,0 +1,145 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class TrailersTest extends AbstractTest
{
@Test
public void testTrailersSentByClient() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
Assert.assertFalse(frame.isEndStream());
Assert.assertTrue(request.getFields().containsKey("X-Request"));
return new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData trailer = frame.getMetaData();
Assert.assertTrue(frame.isEndStream());
Assert.assertTrue(trailer.getFields().containsKey("X-Trailer"));
latch.countDown();
}
};
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields requestFields = new HttpFields();
requestFields.put("X-Request", "true");
MetaData.Request request = newRequest("GET", requestFields);
HeadersFrame requestFrame = new HeadersFrame(request, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
// Send the trailers.
HttpFields trailerFields = new HttpFields();
trailerFields.put("X-Trailer", "true");
MetaData trailers = new MetaData(HttpVersion.HTTP_2, trailerFields);
HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailers, null, true);
stream.headers(trailerFrame, Callback.NOOP);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testTrailersSentByServer() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
HttpFields responseFields = new HttpFields();
responseFields.put("X-Response", "true");
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseFields);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
stream.headers(responseFrame, new Callback()
{
@Override
public void succeeded()
{
HttpFields trailerFields = new HttpFields();
trailerFields.put("X-Trailer", "true");
MetaData trailer = new MetaData(HttpVersion.HTTP_2, trailerFields);
HeadersFrame trailerFrame = new HeadersFrame(stream.getId(), trailer, null, true);
stream.headers(trailerFrame, NOOP);
}
});
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(request, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
private boolean responded;
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (!responded)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(response.getFields().containsKey("X-Response"));
Assert.assertFalse(frame.isEndStream());
responded = true;
}
else
{
MetaData trailer = frame.getMetaData();
Assert.assertTrue(trailer.getFields().containsKey("X-Trailer"));
Assert.assertTrue(frame.isEndStream());
latch.countDown();
}
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -1097,6 +1097,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
protected void notifyHeaders(IStream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
if (listener == null)
return;
try
{
listener.onHeaders(stream, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
@Override
public String toString()
{

View File

@ -95,9 +95,25 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
stream.setListener(listener);
}
}
else if (metaData.isResponse())
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_request");
}
else
{
onConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "invalid_request");
// Trailers.
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream != null)
{
stream.process(frame, Callback.NOOP);
notifyHeaders(stream, frame);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
}
}
}

View File

@ -134,6 +134,16 @@ public abstract class AbstractConnection implements Connection
getEndPoint().fillInterested(_readCallback);
}
public void tryFillInterested()
{
tryFillInterested(_readCallback);
}
public void tryFillInterested(Callback callback)
{
getEndPoint().tryFillInterested(callback);
}
public boolean isFillInterested()
{
return getEndPoint().isFillInterested();

View File

@ -347,12 +347,19 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
}
@Override
public void fillInterested(Callback callback) throws IllegalStateException
public void fillInterested(Callback callback)
{
notIdle();
_fillInterest.register(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
notIdle();
return _fillInterest.tryRegister(callback);
}
@Override
public boolean isFillInterested()
{

View File

@ -205,6 +205,14 @@ public interface EndPoint extends Closeable
*/
void fillInterested(Callback callback) throws ReadPendingException;
/**
* <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
*
* @param callback the callback to call when an error occurs or we are readable.
* @return true if set
*/
boolean tryFillInterested(Callback callback);
/**
* @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet
* been called

View File

@ -55,24 +55,37 @@ public abstract class FillInterest
*/
public void register(Callback callback) throws ReadPendingException
{
if (callback == null)
throw new IllegalArgumentException();
if (_interested.compareAndSet(null, callback))
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
}
}
else
if (!tryRegister(callback))
{
LOG.warn("Read pending for {} prevented {}", _interested, callback);
if (LOG.isDebugEnabled())
LOG.warn("callback set at ",_lastSet);
throw new ReadPendingException();
}
}
/**
* Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #needsFillInterest()}
* returns true or eventually once {@link #fillable()} is called.
*
* @param callback the callback to register
* @return true if the register succeeded
*/
public boolean tryRegister(Callback callback)
{
if (callback == null)
throw new IllegalArgumentException();
if (!_interested.compareAndSet(null, callback))
return false;
if (LOG.isDebugEnabled())
{
LOG.debug("{} register {}",this,callback);
_lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName());
}
try
{
if (LOG.isDebugEnabled())
@ -83,6 +96,8 @@ public abstract class FillInterest
{
onFail(e);
}
return true;
}
/**

View File

@ -261,14 +261,17 @@ public class SslConnection extends AbstractConnection
_decryptedEndPoint.getFillInterest().fillable();
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
boolean runComplete = false;
synchronized(_decryptedEndPoint)
{
if (_decryptedEndPoint._flushRequiresFillToProgress)
{
_decryptedEndPoint._flushRequiresFillToProgress = false;
_runCompleteWrite.run();
runComplete = true;
}
}
if (runComplete)
_runCompleteWrite.run();
if (LOG.isDebugEnabled())
LOG.debug("onFillable exit {}", _decryptedEndPoint);
@ -455,6 +458,8 @@ public class SslConnection extends AbstractConnection
// OR if we are handshaking we need to read some encrypted data OR
// if neither then we should just try the flush again.
boolean try_again = false;
boolean write = false;
boolean need_fill_interest = false;
synchronized (DecryptedEndPoint.this)
{
if (LOG.isDebugEnabled())
@ -464,15 +469,14 @@ public class SslConnection extends AbstractConnection
{
// write it
_cannotAcceptMoreAppDataToFlush = true;
getEndPoint().write(_writeCallback, _encryptedOutput);
write = true;
}
// If we are handshaking and need to read,
else if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP)
{
// check if we are actually read blocked in order to write
_flushRequiresFillToProgress = true;
ensureFillInterested();
_flushRequiresFillToProgress = true;
need_fill_interest = !SslConnection.this.isFillInterested();
}
else
{
@ -485,8 +489,11 @@ public class SslConnection extends AbstractConnection
}
}
if (try_again)
if (write)
getEndPoint().write(_writeCallback, _encryptedOutput);
else if (need_fill_interest)
ensureFillInterested();
else if (try_again)
{
// If the output is closed,
if (isOutputShutdown())
@ -502,6 +509,7 @@ public class SslConnection extends AbstractConnection
getExecutor().execute(_runCompleteWrite);
}
}
}
@Override
@ -511,11 +519,12 @@ public class SslConnection extends AbstractConnection
// method on the DecryptedEndPoint, so we have to work out if there is
// decrypted data to be filled or what callbacks to setup to be told when there
// might be more encrypted data available to attempt another call to fill
boolean fillable;
boolean write = false;
synchronized (DecryptedEndPoint.this)
{
// Do we already have some app data, then app can fill now so return true
boolean fillable = (BufferUtil.hasContent(_decryptedInput))
fillable = (BufferUtil.hasContent(_decryptedInput))
// or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill
|| BufferUtil.hasContent(_encryptedInput) && !_underFlown;
@ -534,7 +543,7 @@ public class SslConnection extends AbstractConnection
{
// write it
_cannotAcceptMoreAppDataToFlush = true;
getEndPoint().write(_writeCallback, _encryptedOutput);
write = true;
}
else
{
@ -545,12 +554,13 @@ public class SslConnection extends AbstractConnection
}
}
}
if (fillable)
getExecutor().execute(_runFillable);
else
ensureFillInterested();
}
if (write)
getEndPoint().write(_writeCallback, _encryptedOutput);
else if (fillable)
getExecutor().execute(_runFillable);
else
ensureFillInterested();
}
@Override
@ -957,7 +967,7 @@ public class SslConnection extends AbstractConnection
_flushRequiresFillToProgress = true;
fill(__FLUSH_CALLED_FILL);
// Check if after the fill() we need to wrap again
if (handshakeStatus == HandshakeStatus.NEED_WRAP)
if (_sslEngine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
continue;
}
return allConsumed && BufferUtil.isEmpty(_encryptedOutput);
@ -1002,27 +1012,37 @@ public class SslConnection extends AbstractConnection
{
try
{
synchronized (this)
boolean flush = false;
boolean close = false;
synchronized (_decryptedEndPoint)
{
boolean ishut = isInputShutdown();
boolean oshut = isOutputShutdown();
if (LOG.isDebugEnabled())
LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut);
if (!oshut)
if (oshut)
return;
if (!_closedOutbound)
{
if (!_closedOutbound)
{
_closedOutbound=true; // Only attempt this once
_sslEngine.closeOutbound();
flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message.
}
if (ishut)
getEndPoint().close();
else
ensureFillInterested();
_closedOutbound=true; // Only attempt this once
_sslEngine.closeOutbound();
flush = true;
}
// TODO review close logic here
if (ishut)
close = true;
}
if (flush)
flush(BufferUtil.EMPTY_BUFFER); // Send the TLS close message.
if (close)
getEndPoint().close();
else
ensureFillInterested();
}
catch (Throwable x)
{
@ -1033,12 +1053,9 @@ public class SslConnection extends AbstractConnection
private void ensureFillInterested()
{
if (!SslConnection.this.isFillInterested())
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested SSL NB {}",SslConnection.this);
SslConnection.this.getEndPoint().fillInterested(_sslReadCallback);
}
if (LOG.isDebugEnabled())
LOG.debug("fillInterested SSL NB {}",SslConnection.this);
SslConnection.this.tryFillInterested(_sslReadCallback);
}
@Override
@ -1110,4 +1127,5 @@ public class SslConnection extends AbstractConnection
return super.toString()+"->"+getEndPoint().toString();
}
}
}

View File

@ -662,6 +662,11 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_endp.fillInterested(callback);
}
public boolean tryFillInterested(Callback callback)
{
return _endp.tryFillInterested(callback);
}
@Override
public boolean isFillInterested()
{

View File

@ -253,7 +253,7 @@ public class SessionData implements Serializable
public long calcExpiry ()
{
return (getMaxInactiveMs() <= 0 ? 0 : (System.currentTimeMillis() + getMaxInactiveMs()));
return calcExpiry(System.currentTimeMillis());
}
public long calcExpiry (long time)

View File

@ -19,9 +19,6 @@
package org.eclipse.jetty.webapp;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import java.io.File;
import java.net.URL;
import java.nio.file.Path;
@ -33,7 +30,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.eclipse.jetty.util.ArrayTernaryTrie;
@ -44,7 +40,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.resource.Resource;
/* ------------------------------------------------------------ */
/**
* Classpath classes list performs sequential pattern matching of a class name
* against an internal array of classpath pattern entries.
@ -156,7 +151,7 @@ public class ClasspathPattern extends AbstractSet<String>
@Override
public Iterator<Entry> iterator()
{
return _entries.keySet().stream().map(k->_entries.get(k)).iterator();
return _entries.keySet().stream().map(_entries::get).iterator();
}
@Override
@ -335,8 +330,6 @@ public class ClasspathPattern extends AbstractSet<String>
Map<String,Entry> _entries = new HashMap<>();
Set<String> _classes = new HashSet<>();
IncludeExcludeSet<Entry,String> _patterns = new IncludeExcludeSet<>(ByPackageOrName.class);
IncludeExcludeSet<File,Path> _locations = new IncludeExcludeSet<>(ByLocation.class);
@ -544,16 +537,16 @@ public class ClasspathPattern extends AbstractSet<String>
{
try
{
Resource resource = TypeUtil.getLoadedFrom(clazz);
Path path = resource.getFile().toPath();
Boolean byName = _patterns.isIncludedAndNotExcluded(clazz.getName());
Boolean byLocation = _locations.isIncludedAndNotExcluded(path);
Resource resource = TypeUtil.getLoadedFrom(clazz);
Boolean byLocation = resource == null || resource.getFile() == null
? null
: _locations.isIncludedAndNotExcluded(resource.getFile().toPath());
// Combine the tri-state match of both IncludeExclude Sets
boolean included = byName==TRUE || byLocation==TRUE
boolean included = byName==Boolean.TRUE || byLocation==Boolean.TRUE
|| (byName==null && !_patterns.hasIncludes() && byLocation==null && !_locations.hasIncludes());
boolean excluded = byName==FALSE || byLocation==FALSE;
boolean excluded = byName==Boolean.FALSE || byLocation==Boolean.FALSE;
return included && !excluded;
}
catch (Exception e)
@ -588,9 +581,9 @@ public class ClasspathPattern extends AbstractSet<String>
}
// Combine the tri-state match of both IncludeExclude Sets
boolean included = byName==TRUE || byLocation==TRUE
boolean included = byName==Boolean.TRUE || byLocation==Boolean.TRUE
|| (byName==null && !_patterns.hasIncludes() && byLocation==null && !_locations.hasIncludes());
boolean excluded = byName==FALSE || byLocation==FALSE;
boolean excluded = byName==Boolean.FALSE || byLocation==Boolean.FALSE;
return included && !excluded;
}

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
/**
* Wrapper for a {@link ServerEndpointConfig} where there PathParm information from the incoming request.
* Wrapper for a {@link ServerEndpointConfig} where there is PathParam information from the incoming request.
*/
public class PathParamServerEndpointConfig extends BasicServerEndpointConfig implements ServerEndpointConfig
{