[Jetty 12] Made WebSocket over HTTP/2 work. (#8685)

* Made WebSocket over HTTP/2 work.

Re-enabled tests, and restored HTTP2StreamEndPoint,
as well as implemented getTunnelSupport() for HTTP/2.

Removed from HttpStream methods for upgrade that are
not necessary anymore.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-10-07 12:50:42 +02:00 committed by GitHub
parent 9acc24212e
commit c785f08b17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1309 additions and 1926 deletions

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpStream;
@ -324,25 +323,6 @@ public class HttpStreamOverFCGI implements HttpStream
_connection.onCompleted(x);
}
@Override
public boolean isComplete()
{
// TODO
return false;
}
@Override
public void setUpgradeConnection(Connection connection)
{
throw new UnsupportedOperationException();
}
@Override
public Connection upgrade()
{
return null;
}
public boolean onIdleTimeout(Throwable timeout)
{
Runnable task = _httpChannel.onFailure(timeout);

View File

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.resource.Resource;
/**
* HttpContent.ContentFactory implementation that wraps any other HttpContent.ContentFactory instance
@ -184,8 +185,6 @@ public class CachingContentFactory implements HttpContent.ContentFactory
long len = httpContent.getContentLengthValue();
if (len <= 0)
return false;
if (isUseFileMappedBuffer())
return true;
return ((len <= _maxCachedFileSize) && (len + getCachedSize() <= _maxCacheSize));
}
@ -258,7 +257,7 @@ public class CachingContentFactory implements HttpContent.ContentFactory
_contentLengthValue = resourceSize;
// map the content into memory if possible
ByteBuffer byteBuffer = _useFileMappedBuffer ? BufferUtil.toMappedBuffer(_delegate.getResource(), 0, _contentLengthValue) : null;
ByteBuffer byteBuffer = _useFileMappedBuffer ? toMappedBuffer(_delegate.getResource(), _contentLengthValue) : null;
if (byteBuffer == null)
{
@ -299,6 +298,18 @@ public class CachingContentFactory implements HttpContent.ContentFactory
_lastAccessed = NanoTime.now();
}
private ByteBuffer toMappedBuffer(Resource resource, long length)
{
try
{
return BufferUtil.toMappedBuffer(resource, 0, length);
}
catch (Throwable x)
{
return null;
}
}
long calculateSize()
{
long totalSize = _contentLengthValue;

View File

@ -46,7 +46,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
private final AtomicBoolean eof = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private Stream.Data data;
private final AtomicReference<Stream.Data> data = new AtomicReference<>();
private Connection connection;
public HTTP2StreamEndPoint(HTTP2Stream stream)
@ -186,8 +186,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
@Override
public int fill(ByteBuffer sink) throws IOException
{
Stream.Data data = this.data.get();
if (data != null)
return fillFromData(sink);
return fillFromData(data, sink);
Throwable failure = this.failure.get();
if (failure != null)
@ -196,7 +197,8 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
if (eof.get())
return -1;
Stream.Data data = this.data = stream.readData();
data = stream.readData();
this.data.set(data);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", data, this);
@ -204,29 +206,36 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
if (data == null)
return 0;
return fillFromData(sink);
return fillFromData(data, sink);
}
private int fillFromData(ByteBuffer sink)
private int fillFromData(Stream.Data data, ByteBuffer sink)
{
int sinkPosition = BufferUtil.flipToFill(sink);
int length = 0;
ByteBuffer source = data.frame().getData();
int sourceLength = source.remaining();
int length = Math.min(sourceLength, sink.remaining());
int sourceLimit = source.limit();
source.limit(source.position() + length);
sink.put(source);
source.limit(sourceLimit);
BufferUtil.flipToFlush(sink, sinkPosition);
boolean hasContent = source.hasRemaining();
if (hasContent)
{
int sinkPosition = BufferUtil.flipToFill(sink);
int sourceLength = source.remaining();
length = Math.min(sourceLength, sink.remaining());
int sourceLimit = source.limit();
source.limit(source.position() + length);
sink.put(source);
source.limit(sourceLimit);
BufferUtil.flipToFlush(sink, sinkPosition);
}
if (!source.hasRemaining())
{
boolean endStream = data.frame().isEndStream();
eof.set(endStream);
data.release();
data = null;
this.data.set(null);
if (!endStream)
stream.demand();
if (!hasContent)
length = endStream ? -1 : 0;
}
return length;
@ -356,7 +365,12 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
{
boolean result = readCallback.compareAndSet(null, callback);
if (result)
process();
{
if (data.get() != null)
process();
else
stream.demand();
}
return result;
}

View File

@ -18,7 +18,6 @@ module org.eclipse.jetty.http2.hpack
requires transitive org.eclipse.jetty.http;
exports org.eclipse.jetty.http2.hpack;
exports org.eclipse.jetty.http2.hpack.internal;
provides org.eclipse.jetty.http.HttpFieldPreEncoder with
org.eclipse.jetty.http2.hpack.HpackFieldPreEncoder;

View File

@ -1,760 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.server.internal;
public class HttpChannelOverHTTP2
{
//
// extends HttpChannel implements Closeable, WriteFlusher.Listener, HTTP2Channel.Server
//{
// private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP2.class);
// private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
// private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
//
// private boolean _expect100Continue;
// private boolean _delayedUntilContent;
// private boolean _useOutputDirectByteBuffers;
// private final ContentDemander _contentDemander;
//
// public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
// {
// super(connector, configuration, endPoint, transport);
// _contentDemander = new ContentDemander();
// }
//
// protected IStream getStream()
// {
// return getHttpTransport().getStream();
// }
//
// @Override
// public boolean isUseOutputDirectByteBuffers()
// {
// return _useOutputDirectByteBuffers;
// }
//
// public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
// {
// _useOutputDirectByteBuffers = useOutputDirectByteBuffers;
// }
//
// @Override
// public boolean isExpecting100Continue()
// {
// return _expect100Continue;
// }
//
// @Override
// public void setIdleTimeout(long timeoutMs)
// {
// getStream().setIdleTimeout(timeoutMs);
// }
//
// @Override
// public long getIdleTimeout()
// {
// return getStream().getIdleTimeout();
// }
//
// @Override
// public void onFlushed(long bytes) throws IOException
// {
// getResponse().getHttpOutput().onFlushed(bytes);
// }
//
// public Runnable onRequest(HeadersFrame frame)
// {
// try
// {
// MetaData.Request request = (MetaData.Request)frame.getMetaData();
// HttpFields fields = request.getFields();
//
// _expect100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
//
// HttpFields.Mutable response = getResponse().getHttpFields();
// if (getHttpConfiguration().getSendServerVersion())
// response.add(SERVER_VERSION);
// if (getHttpConfiguration().getSendXPoweredBy())
// response.add(POWERED_BY);
//
// onRequest(request);
//
// boolean endStream = frame.isEndStream();
// if (endStream)
// {
// onContentComplete();
// onRequestComplete();
// }
//
// boolean connect = request instanceof MetaData.ConnectRequest;
// _delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
// !endStream && !_expect100Continue && !connect;
//
// // Delay the demand of DATA frames for CONNECT with :protocol
// // or for normal requests expecting 100 continue.
// if (connect)
// {
// if (request.getProtocol() == null)
// _contentDemander.demand(false);
// }
// else
// {
// if (_delayedUntilContent)
// _contentDemander.demand(false);
// }
//
// if (LOG.isDebugEnabled())
// {
// Stream stream = getStream();
// LOG.debug("HTTP2 Request #{}/{}, delayed={}:{}{} {} {}{}{}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// _delayedUntilContent, System.lineSeparator(),
// request.getMethod(), request.getURI(), request.getHttpVersion(),
// System.lineSeparator(), fields);
// }
//
// return _delayedUntilContent ? null : this;
// }
// catch (BadMessageException x)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("onRequest", x);
// onBadMessage(x);
// return null;
// }
// catch (Throwable x)
// {
// onBadMessage(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, null, x));
// return null;
// }
// }
//
// public Runnable onPushRequest(MetaData.Request request)
// {
// try
// {
// onRequest(request);
// getRequest().setAttribute("org.eclipse.jetty.pushed", Boolean.TRUE);
// onContentComplete();
// onRequestComplete();
//
// if (LOG.isDebugEnabled())
// {
// Stream stream = getStream();
// LOG.debug("HTTP2 PUSH Request #{}/{}:{}{} {} {}{}{}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
// request.getMethod(), request.getURI(), request.getHttpVersion(),
// System.lineSeparator(), request.getFields());
// }
//
// return this;
// }
// catch (BadMessageException x)
// {
// onBadMessage(x);
// return null;
// }
// catch (Throwable x)
// {
// onBadMessage(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, null, x));
// return null;
// }
// }
//
// @Override
// public HttpTransportOverHTTP2 getHttpTransport()
// {
// return (HttpTransportOverHTTP2)super.getHttpTransport();
// }
//
// @Override
// public void recycle()
// {
// super.recycle();
// getHttpTransport().recycle();
// _expect100Continue = false;
// _delayedUntilContent = false;
// // The content demander must be the very last thing to be recycled
// // to make sure any pending demanding content gets cleared off.
// _contentDemander.recycle();
// }
//
// @Override
// protected void commit(MetaData.Response info)
// {
// super.commit(info);
// if (LOG.isDebugEnabled())
// {
// Stream stream = getStream();
// LOG.debug("HTTP2 Commit Response #{}/{}:{}{} {} {}{}{}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getHttpVersion(), info.getStatus(), info.getReason(),
// System.lineSeparator(), info.getFields());
// }
// }
//
// @Override
// public Runnable onData(DataFrame frame, Callback callback)
// {
// ByteBuffer buffer = frame.getData();
// int length = buffer.remaining();
// HttpInput.Content content = new HttpInput.Content(buffer)
// {
// @Override
// public boolean isEof()
// {
// return frame.isEndStream();
// }
//
// @Override
// public void succeeded()
// {
// callback.succeeded();
// }
//
// @Override
// public void failed(Throwable x)
// {
// callback.failed(x);
// }
//
// @Override
// public InvocationType getInvocationType()
// {
// return callback.getInvocationType();
// }
// };
// boolean needed = _contentDemander.onContent(content);
// boolean handle = onContent(content);
//
// boolean endStream = frame.isEndStream();
// if (endStream)
// {
// boolean handleContent = onContentComplete();
// // This will generate EOF -> must happen before onContentProducible.
// boolean handleRequest = onRequestComplete();
// handle |= handleContent | handleRequest;
// }
//
// boolean woken = needed && getRequest().getHttpInput().onContentProducible();
// handle |= woken;
// if (LOG.isDebugEnabled())
// {
// Stream stream = getStream();
// LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, woken: {}, needed: {}, handle: {}",
// stream.getId(),
// Integer.toHexString(stream.getSession().hashCode()),
// length,
// endStream ? "last" : "some",
// woken,
// needed,
// handle);
// }
//
// boolean wasDelayed = _delayedUntilContent;
// _delayedUntilContent = false;
// return handle || wasDelayed ? this : null;
// }
//
// /**
// * Demanding content is a marker content that is used to remember that a demand was
// * registered into the stream. The {@code needed} flag indicates if the demand originated
// * from a call to {@link #produceContent()} when false or {@link #needContent()}
// * when true, as {@link HttpInput#onContentProducible()} must only be called
// * only when {@link #needContent()} was called.
// * Instances of this class must never escape the scope of this channel impl,
// * so {@link #produceContent()} must never return one.
// */
// private static final class DemandingContent extends HttpInput.SpecialContent
// {
// private final boolean needed;
//
// private DemandingContent(boolean needed)
// {
// this.needed = needed;
// }
// }
//
// private static final HttpInput.Content EOF = new HttpInput.EofContent();
// private static final HttpInput.Content DEMANDING_NEEDED = new DemandingContent(true);
// private static final HttpInput.Content DEMANDING_NOT_NEEDED = new DemandingContent(false);
//
// private class ContentDemander
// {
// private final AtomicReference<HttpInput.Content> _content = new AtomicReference<>();
//
// public void recycle()
// {
// if (LOG.isDebugEnabled())
// LOG.debug("recycle {}", this);
// HttpInput.Content c = _content.getAndSet(null);
// if (c != null && !c.isSpecial())
// throw new AssertionError("unconsumed content: " + c);
// }
//
// public HttpInput.Content poll()
// {
// while (true)
// {
// HttpInput.Content c = _content.get();
// if (LOG.isDebugEnabled())
// LOG.debug("poll, content = {}", c);
// if (c == null || c.isSpecial() || _content.compareAndSet(c, c.isEof() ? EOF : null))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("returning current content");
// return c;
// }
// }
// }
//
// public boolean demand(boolean needed)
// {
// while (true)
// {
// HttpInput.Content c = _content.get();
// if (LOG.isDebugEnabled())
// LOG.debug("demand({}), content = {}", needed, c);
// if (c instanceof DemandingContent)
// {
// if (needed && !((DemandingContent)c).needed)
// {
// if (!_content.compareAndSet(c, DEMANDING_NEEDED))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("already demanding but switched needed flag to true");
// continue;
// }
// }
// if (LOG.isDebugEnabled())
// LOG.debug("already demanding, returning false");
// return false;
// }
// if (c != null)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("content available, returning true");
// return true;
// }
// if (_content.compareAndSet(null, needed ? DEMANDING_NEEDED : DEMANDING_NOT_NEEDED))
// {
// IStream stream = getStream();
// if (stream == null)
// {
// _content.set(null);
// if (LOG.isDebugEnabled())
// LOG.debug("no content available, switched to demanding but stream is now null");
// return false;
// }
// if (LOG.isDebugEnabled())
// LOG.debug("no content available, demanding stream {}", stream);
// stream.demand(1);
// c = _content.get();
// boolean hasContent = !(c instanceof DemandingContent) && c != null;
// if (LOG.isDebugEnabled())
// LOG.debug("has content now? {}", hasContent);
// return hasContent;
// }
// }
// }
//
// public boolean onContent(HttpInput.Content content)
// {
// while (true)
// {
// HttpInput.Content c = _content.get();
// if (LOG.isDebugEnabled())
// LOG.debug("content delivered by stream: {}, current content: {}", content, c);
// if (c instanceof DemandingContent)
// {
// if (_content.compareAndSet(c, content))
// {
// boolean needed = ((DemandingContent)c).needed;
// if (LOG.isDebugEnabled())
// LOG.debug("replacing demand content with {} succeeded; returning {}", content, needed);
// return needed;
// }
// }
// else if (c == null)
// {
// if (!content.isSpecial())
// {
// // This should never happen, consider as a bug.
// content.failed(new IllegalStateException("Non special content without demand : " + content));
// return false;
// }
// if (_content.compareAndSet(null, content))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("replacing null content with {} succeeded", content);
// return false;
// }
// }
// else if (c.isEof() && content.isEof() && content.isEmpty())
// {
// content.succeeded();
// return true;
// }
// else if (content.getError() != null)
// {
// if (c.getError() != null)
// {
// if (c.getError() != content.getError())
// c.getError().addSuppressed(content.getError());
// return true;
// }
// if (_content.compareAndSet(c, content))
// {
// c.failed(content.getError());
// if (LOG.isDebugEnabled())
// LOG.debug("replacing current content with {} succeeded", content);
// return true;
// }
// }
// else if (c.getError() != null && content.remaining() == 0)
// {
// content.succeeded();
// return true;
// }
// else
// {
// // This should never happen, consider as a bug.
// content.failed(new IllegalStateException("Cannot overwrite exiting content " + c + " with " + content));
// return false;
// }
// }
// }
//
// public boolean onTimeout(Throwable failure)
// {
// while (true)
// {
// HttpInput.Content c = _content.get();
// if (LOG.isDebugEnabled())
// LOG.debug("onTimeout with current content: {} and failure = {}", c, failure);
// if (!(c instanceof DemandingContent))
// return false;
// if (_content.compareAndSet(c, new HttpInput.ErrorContent(failure)))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("replacing current content with error succeeded");
// return true;
// }
// }
// }
//
// public void eof()
// {
// while (true)
// {
// HttpInput.Content c = _content.get();
// if (LOG.isDebugEnabled())
// LOG.debug("eof with current content: {}", c);
// if (c instanceof DemandingContent)
// {
// if (_content.compareAndSet(c, EOF))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("replacing current content with special EOF succeeded");
// return;
// }
// }
// else if (c == null)
// {
// if (_content.compareAndSet(null, EOF))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("replacing null content with special EOF succeeded");
// return;
// }
// }
// else if (c.isEof())
// {
// if (LOG.isDebugEnabled())
// LOG.debug("current content already is EOF");
// return;
// }
// else if (c.remaining() == 0)
// {
// if (_content.compareAndSet(c, EOF))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("replacing current content with special EOF succeeded");
// return;
// }
// }
// else
// {
// // EOF may arrive with HEADERS frame (e.g. a trailer) that is not flow controlled, so we need to wrap the existing content.
// // Covered by HttpTrailersTest.testRequestTrailersWithContent.
// HttpInput.Content content = new HttpInput.WrappingContent(c, true);
// if (_content.compareAndSet(c, content))
// {
// if (LOG.isDebugEnabled())
// LOG.debug("replacing current content with {} succeeded", content);
// return;
// }
// }
// }
// }
//
// public boolean failContent(Throwable failure)
// {
// while (true)
// {
// HttpInput.Content c = _content.get();
// if (LOG.isDebugEnabled())
// LOG.debug("failing current content {} with {} {}", c, failure, this);
// if (c == null)
// return false;
// if (c.isSpecial())
// return c.isEof();
// if (_content.compareAndSet(c, null))
// {
// c.failed(failure);
// if (LOG.isDebugEnabled())
// LOG.debug("replacing current content with null succeeded");
// return false;
// }
// }
// }
//
// @Override
// public String toString()
// {
// return getClass().getSimpleName() + "@" + hashCode() + " _content=" + _content;
// }
// }
//
// @Override
// public boolean needContent()
// {
// boolean hasContent = _contentDemander.demand(true);
// if (LOG.isDebugEnabled())
// LOG.debug("needContent has content? {}", hasContent);
// return hasContent;
// }
//
// @Override
// public HttpInput.Content produceContent()
// {
// HttpInput.Content content = null;
// if (_contentDemander.demand(false))
// content = _contentDemander.poll();
// if (LOG.isDebugEnabled())
// LOG.debug("produceContent produced {}", content);
// return content;
// }
//
// @Override
// public boolean failAllContent(Throwable failure)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("failing all content with {} {}", failure, this);
// IStream stream = getStream();
// boolean atEof = stream == null || stream.failAllData(failure);
// atEof |= _contentDemander.failContent(failure);
// if (LOG.isDebugEnabled())
// LOG.debug("failed all content, reached EOF? {}", atEof);
// return atEof;
// }
//
// @Override
// public boolean failed(Throwable x)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("failed " + x);
//
// _contentDemander.onContent(new HttpInput.ErrorContent(x));
//
// return getRequest().getHttpInput().onContentProducible();
// }
//
// @Override
// protected boolean eof()
// {
// _contentDemander.eof();
// return false;
// }
//
// @Override
// public Runnable onTrailer(HeadersFrame frame)
// {
// HttpFields trailers = frame.getMetaData().getFields();
// if (trailers.size() > 0)
// onTrailers(trailers);
//
// if (LOG.isDebugEnabled())
// {
// Stream stream = getStream();
// LOG.debug("HTTP2 Request #{}/{}, trailers:{}{}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// System.lineSeparator(), trailers);
// }
//
// // This will generate EOF -> need to call onContentProducible.
// boolean handle = onRequestComplete();
// boolean woken = getRequest().getHttpInput().onContentProducible();
// handle |= woken;
//
// boolean wasDelayed = _delayedUntilContent;
// _delayedUntilContent = false;
// return handle || wasDelayed ? this : null;
// }
//
// @Override
// public boolean isIdle()
// {
// return getState().isIdle();
// }
//
// @Override
// public boolean onTimeout(Throwable failure, Consumer<Runnable> consumer)
// {
// final boolean delayed = _delayedUntilContent;
// _delayedUntilContent = false;
//
// boolean reset = isIdle();
// if (reset)
// consumeInput();
//
// getHttpTransport().onStreamTimeout(failure);
//
// failure.addSuppressed(new Throwable("HttpInput idle timeout"));
// _contentDemander.onTimeout(failure);
// boolean needed = getRequest().getHttpInput().onContentProducible();
//
// if (needed || delayed)
// {
// consumer.accept(this::handleWithContext);
// reset = false;
// }
//
// return reset;
// }
//
// @Override
// public Runnable onFailure(Throwable failure, Callback callback)
// {
// getHttpTransport().onStreamFailure(failure);
// boolean handle = failed(failure);
// consumeInput();
// return new FailureTask(failure, callback, handle);
// }
//
// protected void consumeInput()
// {
// getRequest().getHttpInput().consumeAll();
// }
//
// private void handleWithContext()
// {
// ContextHandler context = getState().getContextHandler();
// if (context != null)
// context.handle(getRequest(), this);
// else
// handle();
// }
//
// /**
// * If the associated response has the Expect header set to 100 Continue,
// * then accessing the input stream indicates that the handler/servlet
// * is ready for the request body and thus a 100 Continue response is sent.
// *
// * @throws IOException if the InputStream cannot be created
// */
// @Override
// public void continue100(int available) throws IOException
// {
// // If the client is expecting 100 CONTINUE, then send it now.
// // TODO: consider using an AtomicBoolean ?
// if (isExpecting100Continue())
// {
// _expect100Continue = false;
//
// // is content missing?
// if (available == 0)
// {
// if (getResponse().isCommitted())
// throw new IOException("Committed before 100 Continues");
//
// boolean committed = sendResponse(HttpGenerator.CONTINUE_100_INFO, null, false);
// if (!committed)
// throw new IOException("Concurrent commit while trying to send 100-Continue");
// }
// }
// }
//
// @Override
// public boolean isTunnellingSupported()
// {
// return true;
// }
//
// @Override
// public EndPoint getTunnellingEndPoint()
// {
// return new ServerHTTP2StreamEndPoint(getStream());
// }
//
// @Override
// public void close()
// {
// abort(new IOException("Unexpected close"));
// }
//
// @Override
// public String toString()
// {
// IStream stream = getStream();
// long streamId = stream == null ? -1 : stream.getId();
// return String.format("%s#%d", super.toString(), streamId);
// }
//
// private class FailureTask implements Runnable
// {
// private final Throwable failure;
// private final Callback callback;
// private final boolean handle;
//
// public FailureTask(Throwable failure, Callback callback, boolean handle)
// {
// this.failure = failure;
// this.callback = callback;
// this.handle = handle;
// }
//
// @Override
// public void run()
// {
// try
// {
// if (handle)
// handleWithContext();
// else if (getHttpConfiguration().isNotifyRemoteAsyncErrors())
// getState().asyncError(failure);
// callback.succeeded();
// }
// catch (Throwable x)
// {
// callback.failed(x);
// }
// }
//
// @Override
// public String toString()
// {
// return String.format("%s@%x[%s]", getClass().getName(), hashCode(), failure);
// }
// }
}

View File

@ -37,8 +37,10 @@ import org.eclipse.jetty.http2.internal.HTTP2Channel;
import org.eclipse.jetty.http2.internal.HTTP2Stream;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.server.TunnelSupport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime;
@ -58,6 +60,7 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
private final long _nanoTime;
private MetaData.Request _requestMetaData;
private MetaData.Response _responseMetaData;
private TunnelSupport tunnelSupport;
private Content.Chunk _chunk;
private boolean committed;
private boolean _demand;
@ -103,6 +106,9 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
_expects100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
if (_requestMetaData instanceof MetaData.ConnectRequest)
tunnelSupport = new TunnelSupportOverHTTP2(_requestMetaData.getProtocol());
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 request #{}/{}, {} {} {}{}{}",
@ -517,19 +523,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
return committed;
}
@Override
public boolean isComplete()
{
// TODO
return false;
}
@Override
public void setUpgradeConnection(Connection connection)
{
throw new UnsupportedOperationException();
}
@Override
public boolean isIdle()
{
@ -538,10 +531,17 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
}
@Override
public Connection upgrade()
public TunnelSupport getTunnelSupport()
{
// TODO
throw new UnsupportedOperationException();
return tunnelSupport;
}
@Override
public Throwable consumeAvailable()
{
if (HttpMethod.CONNECT.is(_requestMetaData.getMethod()))
return null;
return HttpStream.super.consumeAvailable();
}
@Override
@ -568,16 +568,34 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
@Override
public void succeeded()
{
_httpChannel.recycle();
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
if (!_stream.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{}: unconsumed request content, resetting stream", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
if (isTunnel(_requestMetaData, _responseMetaData))
{
Connection connection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE);
if (connection == null)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{}: no upgrade connection, resetting stream", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}
else
{
EndPoint endPoint = tunnelSupport.getEndPoint();
_stream.setAttachment(endPoint);
endPoint.upgrade(connection);
}
}
else
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{}: unconsumed request content, resetting stream", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}
}
_httpChannel.recycle();
}
@Override
@ -604,4 +622,27 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), getCallback());
}
}
private class TunnelSupportOverHTTP2 implements TunnelSupport
{
private final String protocol;
private final EndPoint endPoint = new ServerHTTP2StreamEndPoint(_stream);
private TunnelSupportOverHTTP2(String protocol)
{
this.protocol = protocol;
}
@Override
public String getProtocol()
{
return protocol;
}
@Override
public EndPoint getEndPoint()
{
return endPoint;
}
}
}

View File

@ -1,560 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.server.internal;
public class HttpTransportOverHTTP2
{
// implements HttpTransport
//{
// private static final Logger LOG = LoggerFactory.getLogger(HttpTransportOverHTTP2.class);
//
// private final AtomicBoolean commit = new AtomicBoolean();
// private final TransportCallback transportCallback = new TransportCallback();
// private final Connector connector;
// private final HTTP2ServerConnection connection;
// private IStream stream;
// private MetaData.Response metaData;
//
// public HttpTransportOverHTTP2(Connector connector, HTTP2ServerConnection connection)
// {
// this.connector = connector;
// this.connection = connection;
// }
//
// public IStream getStream()
// {
// return stream;
// }
//
// public void setStream(IStream stream)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("{} setStream {}", this, stream.getId());
// this.stream = stream;
// }
//
// public void recycle()
// {
// this.stream = null;
// commit.set(false);
// }
//
// @Override
// public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
// {
// if (response != null)
// sendHeaders(request, response, content, lastContent, callback);
// else
// sendContent(request, content, lastContent, callback);
// }
//
// public void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
// {
// metaData = response;
//
// HeadersFrame headersFrame;
// DataFrame dataFrame = null;
// HeadersFrame trailersFrame = null;
//
// boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
// boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
// int status = response.getStatus();
// boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
// if (interimResponse)
// {
// // Must not commit interim responses.
// if (hasContent)
// {
// callback.failed(new IllegalStateException("Interim response cannot have content"));
// return;
// }
// headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
// }
// else
// {
// if (commit.compareAndSet(false, true))
// {
// if (lastContent)
// {
// long realContentLength = BufferUtil.length(content);
// long contentLength = response.getContentLength();
// if (contentLength < 0)
// {
// metaData = new MetaData.Response(
// response.getHttpVersion(),
// response.getStatus(),
// response.getReason(),
// response.getFields(),
// realContentLength,
// response.getTrailerSupplier()
// );
// }
// else if (hasContent && contentLength != realContentLength)
// {
// callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
// return;
// }
// }
//
// if (hasContent)
// {
// headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
// if (lastContent)
// {
// HttpFields trailers = retrieveTrailers();
// if (trailers == null)
// {
// dataFrame = new DataFrame(stream.getId(), content, true);
// }
// else
// {
// dataFrame = new DataFrame(stream.getId(), content, false);
// trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
// }
// }
// else
// {
// dataFrame = new DataFrame(stream.getId(), content, false);
// }
// }
// else
// {
// if (lastContent)
// {
// if (isTunnel(request, metaData))
// {
// headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
// }
// else
// {
// HttpFields trailers = retrieveTrailers();
// if (trailers == null)
// {
// headersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
// }
// else
// {
// headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
// trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
// }
// }
// }
// else
// {
// headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
// }
// }
// }
// else
// {
// callback.failed(new IllegalStateException("committed"));
// return;
// }
// }
//
// HeadersFrame hf = headersFrame;
// DataFrame df = dataFrame;
// HeadersFrame tf = trailersFrame;
//
// transportCallback.send(callback, true, c ->
// {
// if (LOG.isDebugEnabled())
// {
// LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// System.lineSeparator(), HttpVersion.HTTP_2, metaData.getStatus(),
// System.lineSeparator(), metaData.getFields());
// }
// stream.send(new IStream.FrameList(hf, df, tf), c);
// });
// }
//
// public void sendContent(MetaData.Request request, ByteBuffer content, boolean lastContent, Callback callback)
// {
// boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
// boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
// if (hasContent || (lastContent && !isTunnel(request, metaData)))
// {
// if (lastContent)
// {
// HttpFields trailers = retrieveTrailers();
// if (trailers == null)
// {
// transportCallback.send(callback, false, c ->
// sendDataFrame(content, true, true, c));
// }
// else
// {
// SendTrailers sendTrailers = new SendTrailers(callback, trailers);
// if (hasContent)
// {
// transportCallback.send(sendTrailers, false, c ->
// sendDataFrame(content, true, false, c));
// }
// else
// {
// sendTrailers.succeeded();
// }
// }
// }
// else
// {
// transportCallback.send(callback, false, c ->
// sendDataFrame(content, false, false, c));
// }
// }
// else
// {
// callback.succeeded();
// }
// }
//
// private HttpFields retrieveTrailers()
// {
// Supplier<HttpFields> supplier = metaData.getTrailerSupplier();
// if (supplier == null)
// return null;
// HttpFields trailers = supplier.get();
// if (trailers == null)
// return null;
// return trailers.size() == 0 ? null : trailers;
// }
//
// private boolean isTunnel(MetaData.Request request, MetaData.Response response)
// {
// return MetaData.isTunnel(request.getMethod(), response.getStatus());
// }
//
// @Override
// public boolean isPushSupported()
// {
// return stream.getSession().isPushEnabled();
// }
//
// @Override
// public void push(final MetaData.Request request)
// {
// if (!stream.getSession().isPushEnabled())
// {
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP/2 Push disabled for {}", request);
// return;
// }
//
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP/2 Push {}", request);
//
// stream.push(new PushPromiseFrame(stream.getId(), request), new Promise<>()
// {
// @Override
// public void succeeded(Stream pushStream)
// {
// connection.push(connector, (IStream)pushStream, request);
// }
//
// @Override
// public void failed(Throwable x)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("Could not push {}", request, x);
// }
// }, null); // TODO: handle reset from the client ?
// }
//
// private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback)
// {
// if (LOG.isDebugEnabled())
// {
// LOG.debug("HTTP2 Response #{}/{}: {} content bytes{}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// content.remaining(), lastContent ? " (last chunk)" : "");
// }
// DataFrame frame = new DataFrame(stream.getId(), content, endStream);
// stream.data(frame, callback);
// }
//
// private void sendTrailersFrame(MetaData metaData, Callback callback)
// {
// if (LOG.isDebugEnabled())
// {
// LOG.debug("HTTP2 Response #{}/{}: trailers",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
// }
//
// HeadersFrame frame = new HeadersFrame(stream.getId(), metaData, null, true);
// stream.headers(frame, callback);
// }
//
// public void onStreamFailure(Throwable failure)
// {
// transportCallback.abort(failure);
// }
//
// public boolean onStreamTimeout(Throwable failure)
// {
// return transportCallback.idleTimeout(failure);
// }
//
// /**
// * @return true if error sent, false if upgraded or aborted.
// */
// boolean prepareUpgrade()
// {
// HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment();
// Request request = channel.getRequest();
// if (request.getHttpInput().hasContent())
// return channel.sendErrorOrAbort("Unexpected content in CONNECT request");
//
// Connection connection = (Connection)request.getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
// if (connection == null)
// return channel.sendErrorOrAbort("No UPGRADE_CONNECTION_ATTRIBUTE available");
//
// EndPoint endPoint = connection.getEndPoint();
// endPoint.upgrade(connection);
// stream.setAttachment(endPoint);
//
// // Only now that we have switched the attachment, we can demand DATA frames to process them.
// stream.demand(1);
//
// if (LOG.isDebugEnabled())
// LOG.debug("Upgrading to {}", connection);
//
// return false;
// }
//
// @Override
// public void onCompleted()
// {
// Object attachment = stream.getAttachment();
// if (attachment instanceof HttpChannelOverHTTP2)
// {
// // If the stream is not closed, it is still reading the request content.
// // Send a reset to the other end so that it stops sending data.
// if (!stream.isClosed())
// {
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP2 Response #{}: unconsumed request content, resetting stream", stream.getId());
// stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
// }
//
// // Consume the existing queued data frames to
// // avoid stalling the session flow control.
// HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)attachment;
// channel.consumeInput();
// }
// }
//
// @Override
// public void abort(Throwable failure)
// {
// IStream stream = this.stream;
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(),
// stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode()));
// if (stream != null)
// stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
// }
//
// /**
// * <p>Callback that controls sends initiated by the transport, by eventually
// * notifying a nested callback.</p>
// * <p>There are 3 sources of concurrency after a send is initiated:</p>
// * <ul>
// * <li>the completion of the send operation, either success or failure</li>
// * <li>an asynchronous failure coming from the read side such as a stream
// * being reset, or the connection being closed</li>
// * <li>an asynchronous idle timeout</li>
// * </ul>
// *
// * @see State
// */
// private class TransportCallback implements Callback
// {
// private final AutoLock _lock = new AutoLock();
// private State _state = State.IDLE;
// private Callback _callback;
// private boolean _commit;
// private Throwable _failure;
//
// private void reset(Throwable failure)
// {
// assert _lock.isHeldByCurrentThread();
// _state = failure != null ? State.FAILED : State.IDLE;
// _callback = null;
// _commit = false;
// _failure = failure;
// }
//
// private void send(Callback callback, boolean commit, Consumer<Callback> sendFrame)
// {
// Throwable failure = sending(callback, commit);
// if (failure == null)
// sendFrame.accept(this);
// else
// callback.failed(failure);
// }
//
// private void abort(Throwable failure)
// {
// failed(failure);
// }
//
// private Throwable sending(Callback callback, boolean commit)
// {
// try (AutoLock l = _lock.lock())
// {
// switch (_state)
// {
// case IDLE:
// {
// _state = State.SENDING;
// _callback = callback;
// _commit = commit;
// return null;
// }
// case FAILED:
// {
// return _failure;
// }
// default:
// {
// return new IllegalStateException("Invalid transport state: " + _state);
// }
// }
// }
// }
//
// @Override
// public void succeeded()
// {
// Callback callback;
// boolean commit;
// try (AutoLock l = _lock.lock())
// {
// if (_state != State.SENDING)
// {
// // This thread lost the race to succeed the current
// // send, as other threads likely already failed it.
// return;
// }
// callback = _callback;
// commit = _commit;
// reset(null);
// }
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP2 Response #{}/{} {} success",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// commit ? "commit" : "flush");
// callback.succeeded();
// }
//
// @Override
// public void failed(Throwable failure)
// {
// Callback callback;
// boolean commit;
// try (AutoLock l = _lock.lock())
// {
// if (_state != State.SENDING)
// {
// reset(failure);
// return;
// }
// callback = _callback;
// commit = _commit;
// reset(failure);
// }
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP2 Response #{}/{} {} failure",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// commit ? "commit" : "flush",
// failure);
// callback.failed(failure);
// }
//
// private boolean idleTimeout(Throwable failure)
// {
// Callback callback = null;
// try (AutoLock l = _lock.lock())
// {
// // Ignore idle timeouts if not writing,
// // as the application may be suspended.
// if (_state == State.SENDING)
// {
// callback = _callback;
// reset(failure);
// }
// }
// boolean timeout = callback != null;
// if (LOG.isDebugEnabled())
// LOG.debug("HTTP2 Response #{}/{} idle timeout {}",
// stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
// timeout ? "expired" : "ignored",
// failure);
// if (timeout)
// callback.failed(failure);
// return timeout;
// }
//
// @Override
// public InvocationType getInvocationType()
// {
// Callback callback;
// try (AutoLock l = _lock.lock())
// {
// callback = _callback;
// }
// return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
// }
// }
//
// /**
// * <p>Send states for {@link TransportCallback}.</p>
// *
// * @see TransportCallback
// */
// private enum State
// {
// /**
// * <p>No send initiated or in progress.</p>
// */
// IDLE,
// /**
// * <p>A send is initiated and possibly in progress.</p>
// */
// SENDING,
// /**
// * <p>The terminal state indicating failure of the send.</p>
// */
// FAILED
// }
//
// private class SendTrailers extends Callback.Nested
// {
// private final HttpFields trailers;
//
// private SendTrailers(Callback callback, HttpFields trailers)
// {
// super(callback);
// this.trailers = trailers;
// }
//
// @Override
// public void succeeded()
// {
// transportCallback.send(getCallback(), false, c ->
// sendTrailersFrame(new MetaData(HttpVersion.HTTP_2, trailers), c));
// }
// }
}

View File

@ -80,6 +80,11 @@
<artifactId>jetty-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-slf4j-impl</artifactId>

View File

@ -1,190 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.tests;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.fail;
// TODO: move to ee9 with ProxyServlet support
@Disabled("move to ee9")
public class ProxyTest
{
@Test
public void test()
{
fail();
}
// private HTTP2Client client;
// private Server proxy;
// private ServerConnector proxyConnector;
// private Server server;
// private ServerConnector serverConnector;
//
// private void startServer(HttpServlet servlet) throws Exception
// {
// QueuedThreadPool serverPool = new QueuedThreadPool();
// serverPool.setName("server");
// server = new Server(serverPool);
// serverConnector = new ServerConnector(server);
// server.addConnector(serverConnector);
//
// ServletContextHandler appCtx = new ServletContextHandler(server, "/", true, false);
// ServletHolder appServletHolder = new ServletHolder(servlet);
// appCtx.addServlet(appServletHolder, "/*");
//
// server.start();
// }
//
// private void startProxy(HttpServlet proxyServlet, Map<String, String> initParams) throws Exception
// {
// QueuedThreadPool proxyPool = new QueuedThreadPool();
// proxyPool.setName("proxy");
// proxy = new Server(proxyPool);
//
// HttpConfiguration configuration = new HttpConfiguration();
// configuration.setSendDateHeader(false);
// configuration.setSendServerVersion(false);
// String value = initParams.get("outputBufferSize");
// if (value != null)
// configuration.setOutputBufferSize(Integer.parseInt(value));
// proxyConnector = new ServerConnector(proxy, new HTTP2ServerConnectionFactory(configuration));
// proxy.addConnector(proxyConnector);
//
// ServletContextHandler proxyContext = new ServletContextHandler(proxy, "/", true, false);
// ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
// proxyServletHolder.setInitParameters(initParams);
// proxyContext.addServlet(proxyServletHolder, "/*");
//
// proxy.start();
// }
//
// private void startClient() throws Exception
// {
// QueuedThreadPool clientExecutor = new QueuedThreadPool();
// clientExecutor.setName("client");
// client = new HTTP2Client();
// client.setExecutor(clientExecutor);
// client.start();
// }
//
// private Session newClient(Session.Listener listener) throws Exception
// {
// String host = "localhost";
// int port = proxyConnector.getLocalPort();
// InetSocketAddress address = new InetSocketAddress(host, port);
// FuturePromise<Session> promise = new FuturePromise<>();
// client.connect(address, listener, promise);
// return promise.get(5, TimeUnit.SECONDS);
// }
//
// private MetaData.Request newRequest(String method, String path, HttpFields fields)
// {
// String host = "localhost";
// int port = proxyConnector.getLocalPort();
// String authority = host + ":" + port;
// return new MetaData.Request(method, HttpScheme.HTTP.asString(), new HostPortHttpField(authority), path, HttpVersion.HTTP_2, fields, -1);
// }
//
// @AfterEach
// public void dispose() throws Exception
// {
// client.stop();
// proxy.stop();
// server.stop();
// }
//
// @Test
// public void testHTTPVersion() throws Exception
// {
// startServer(new HttpServlet()
// {
// @Override
// protected void service(HttpServletRequest request, HttpServletResponse response)
// {
// assertEquals(HttpVersion.HTTP_1_1.asString(), request.getProtocol());
// }
// });
// Map<String, String> params = new HashMap<>();
// params.put("proxyTo", "http://localhost:" + serverConnector.getLocalPort());
// startProxy(new AsyncProxyServlet.Transparent(), params);
// startClient();
//
// CountDownLatch clientLatch = new CountDownLatch(1);
// Session session = newClient(new Session.Listener() {});
// MetaData.Request metaData = newRequest("GET", "/", HttpFields.EMPTY);
// HeadersFrame frame = new HeadersFrame(metaData, null, true);
// session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener()
// {
// @Override
// public void onHeaders(Stream stream, HeadersFrame frame)
// {
// assertTrue(frame.isEndStream());
// MetaData.Response response = (MetaData.Response)frame.getMetaData();
// assertEquals(HttpStatus.OK_200, response.getStatus());
// clientLatch.countDown();
// }
// });
//
// assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void testServerBigDownloadSlowClient() throws Exception
// {
// CountDownLatch serverLatch = new CountDownLatch(1);
// byte[] content = new byte[1024 * 1024];
// startServer(new HttpServlet()
// {
// @Override
// protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
// {
// response.write(true, callback, ByteBuffer.wrap(content));
// serverLatch.countDown();
// }
// });
// Map<String, String> params = new HashMap<>();
// params.put("proxyTo", "http://localhost:" + serverConnector.getLocalPort());
// startProxy(new AsyncProxyServlet.Transparent(), params);
// startClient();
//
// CountDownLatch clientLatch = new CountDownLatch(1);
// Session session = newClient(new Session.Listener() {});
// MetaData.Request metaData = newRequest("GET", "/", HttpFields.EMPTY);
// HeadersFrame frame = new HeadersFrame(metaData, null, true);
// session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener()
// {
// @Override
// public void onData(Stream stream, DataFrame frame, Callback callback)
// {
// try
// {
// TimeUnit.MILLISECONDS.sleep(1);
// callback.succeeded();
// if (frame.isEndStream())
// clientLatch.countDown();
// }
// catch (InterruptedException x)
// {
// callback.failed(x);
// }
// }
// });
//
// assertTrue(serverLatch.await(15, TimeUnit.SECONDS));
// assertTrue(clientLatch.await(15, TimeUnit.SECONDS));
// }
}

View File

@ -0,0 +1,202 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.proxy.ProxyHandler;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ReverseProxyTest
{
private HTTP2Client client;
private Server proxy;
private ServerConnector proxyConnector;
private Server server;
private ServerConnector serverConnector;
private void startServer(Handler handler) throws Exception
{
QueuedThreadPool serverPool = new QueuedThreadPool();
serverPool.setName("server");
server = new Server(serverPool);
serverConnector = new ServerConnector(server);
server.addConnector(serverConnector);
server.setHandler(handler);
server.start();
}
private void startProxy(ProxyHandler handler) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
proxy = new Server(proxyPool);
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
proxyConnector = new ServerConnector(proxy, new HTTP2ServerConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
proxy.setHandler(handler);
proxy.start();
}
private void startClient() throws Exception
{
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client = new HTTP2Client();
client.setExecutor(clientExecutor);
client.start();
}
private Session newClient(Session.Listener listener) throws Exception
{
String host = "localhost";
int port = proxyConnector.getLocalPort();
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, listener, promise);
return promise.get(5, TimeUnit.SECONDS);
}
private MetaData.Request newRequest(String method, String path, HttpFields fields)
{
String host = "localhost";
int port = proxyConnector.getLocalPort();
String authority = host + ":" + port;
return new MetaData.Request(method, HttpScheme.HTTP.asString(), new HostPortHttpField(authority), path, HttpVersion.HTTP_2, fields, -1);
}
@AfterEach
public void dispose() throws Exception
{
client.stop();
proxy.stop();
server.stop();
}
@Test
public void testHTTPVersion() throws Exception
{
startServer(new Handler.Processor()
{
@Override
public void process(Request request, Response response, Callback callback)
{
assertEquals(HttpVersion.HTTP_1_1.asString(), request.getConnectionMetaData().getProtocol());
callback.succeeded();
}
});
startProxy(new ProxyHandler.Reverse(clientToProxyRequest ->
HttpURI.build(clientToProxyRequest.getHttpURI()).port(serverConnector.getLocalPort())));
startClient();
CountDownLatch clientLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener() {});
MetaData.Request metaData = newRequest("GET", "/", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
assertTrue(frame.isEndStream());
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
clientLatch.countDown();
}
});
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testServerBigDownloadSlowClient() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
byte[] content = new byte[1024 * 1024];
startServer(new Handler.Processor()
{
@Override
public void process(Request request, Response response, Callback callback)
{
response.write(true, ByteBuffer.wrap(content), Callback.from(() ->
{
callback.succeeded();
serverLatch.countDown();
}, callback::failed));
}
});
startProxy(new ProxyHandler.Reverse(clientToProxyRequest ->
HttpURI.build(clientToProxyRequest.getHttpURI()).port(serverConnector.getLocalPort())));
startClient();
CountDownLatch clientLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener() {});
MetaData.Request metaData = newRequest("GET", "/", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
try
{
TimeUnit.MILLISECONDS.sleep(1);
Stream.Data data = stream.readData();
data.release();
stream.demand();
if (data.frame().isEndStream())
clientLatch.countDown();
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
});
assertTrue(serverLatch.await(15, TimeUnit.SECONDS));
assertTrue(clientLatch.await(15, TimeUnit.SECONDS));
}
}

View File

@ -33,7 +33,6 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpStream;
@ -475,31 +474,12 @@ public class HttpStreamOverHTTP3 implements HttpStream
return committed;
}
@Override
public boolean isComplete()
{
// TODO
return false;
}
public boolean isIdle()
{
// TODO: is this necessary?
return true;
}
@Override
public void setUpgradeConnection(Connection connection)
{
throw new UnsupportedOperationException();
}
@Override
public Connection upgrade()
{
return null;
}
@Override
public void succeeded()
{

View File

@ -10,6 +10,7 @@
<artifactId>jetty-proxy</artifactId>
<packaging>jar</packaging>
<name>Jetty Core :: Proxy</name>
<description>Jetty Core Proxy</description>
<properties>
<bundle-symbolic-name>${project.groupId}.proxy</bundle-symbolic-name>

View File

@ -20,6 +20,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@ -518,7 +519,7 @@ public abstract class ProxyHandler extends Handler.Processor
public Reverse(Function<Request, HttpURI> httpURIRewriter)
{
this.httpURIRewriter = httpURIRewriter;
this.httpURIRewriter = Objects.requireNonNull(httpURIRewriter);
}
public Function<Request, HttpURI> getHttpURIRewriter()

View File

@ -0,0 +1,584 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.transport.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.internal.ErrorCode;
import org.eclipse.jetty.http2.internal.HTTP2Connection;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ConnectHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ForwardProxyWithDynamicTransportTest
{
private static final Logger LOG = LoggerFactory.getLogger(ForwardProxyWithDynamicTransportTest.class);
private Server server;
private ServerConnector serverConnector;
private ServerConnector serverTLSConnector;
private Server proxy;
private ServerConnector proxyConnector;
private ServerConnector proxyTLSConnector;
private HTTP2Client http2Client;
private HttpClient client;
private void start(Handler handler) throws Exception
{
startServer(handler);
startProxy(new ConnectHandler());
startClient();
}
private void startServer(Handler handler) throws Exception
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setUseCipherSuitesOrder(true);
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
HttpConfiguration httpConfig = new HttpConfiguration();
HttpConnectionFactory h1c = new HttpConnectionFactory(httpConfig);
HTTP2CServerConnectionFactory h2c = new HTTP2CServerConnectionFactory(httpConfig);
serverConnector = new ServerConnector(server, 1, 1, h1c, h2c);
server.addConnector(serverConnector);
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
httpsConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory h1 = new HttpConnectionFactory(httpsConfig);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpsConfig);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
alpn.setDefaultProtocol(h1.getProtocol());
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
serverTLSConnector = new ServerConnector(server, 1, 1, ssl, alpn, h2, h1, h2c);
server.addConnector(serverTLSConnector);
server.setHandler(handler);
server.start();
LOG.info("Started server on :{} and :{}", serverConnector.getLocalPort(), serverTLSConnector.getLocalPort());
}
private void startProxy(ConnectHandler connectHandler) throws Exception
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setUseCipherSuitesOrder(true);
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
QueuedThreadPool proxyThreads = new QueuedThreadPool();
proxyThreads.setName("proxy");
proxy = new Server(proxyThreads);
HttpConfiguration httpConfig = new HttpConfiguration();
ConnectionFactory h1c = new HttpConnectionFactory(httpConfig);
ConnectionFactory h2c = new HTTP2CServerConnectionFactory(httpConfig);
proxyConnector = new ServerConnector(proxy, 1, 1, h1c, h2c);
proxy.addConnector(proxyConnector);
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
httpsConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory h1 = new HttpConnectionFactory(httpsConfig);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpsConfig);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
alpn.setDefaultProtocol(h1.getProtocol());
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
proxyTLSConnector = new ServerConnector(proxy, 1, 1, ssl, alpn, h2, h1, h2c);
proxy.addConnector(proxyTLSConnector);
proxy.setHandler(connectHandler);
connectHandler.setHandler(new ProxyHandler.Forward()
{
@Override
protected HttpClient newHttpClient()
{
QueuedThreadPool proxyClientThreads = new QueuedThreadPool();
proxyClientThreads.setName("proxy-client");
ClientConnector proxyClientConnector = new ClientConnector();
proxyClientConnector.setSelectors(1);
proxyClientConnector.setExecutor(proxyClientThreads);
proxyClientConnector.setSslContextFactory(new SslContextFactory.Client(true));
HTTP2Client proxyHTTP2Client = new HTTP2Client(proxyClientConnector);
ClientConnectionFactory.Info h1 = HttpClientConnectionFactory.HTTP11;
ClientConnectionFactory.Info http2 = new ClientConnectionFactoryOverHTTP2.HTTP2(proxyHTTP2Client);
return new HttpClient(new HttpClientTransportDynamic(proxyClientConnector, h1, http2));
}
});
proxy.start();
LOG.info("Started proxy on :{} and :{}", proxyConnector.getLocalPort(), proxyTLSConnector.getLocalPort());
}
private void startClient() throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setExecutor(clientThreads);
clientConnector.setSslContextFactory(new SslContextFactory.Client(true));
http2Client = new HTTP2Client(clientConnector);
ClientConnectionFactory.Info h1 = HttpClientConnectionFactory.HTTP11;
ClientConnectionFactory.Info http2 = new ClientConnectionFactoryOverHTTP2.HTTP2(http2Client);
client = new HttpClient(new HttpClientTransportDynamic(clientConnector, h1, http2));
client.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(proxy);
LifeCycle.stop(server);
}
public static java.util.stream.Stream<Arguments> testParams()
{
var h1 = List.of("http/1.1");
var h2c = List.of("h2c");
var h2 = List.of("h2");
return java.util.stream.Stream.of(
// HTTP/1.1 Proxy with HTTP/1.1 Server.
Arguments.of(new Origin.Protocol(h1, false), false, HttpVersion.HTTP_1_1, false),
Arguments.of(new Origin.Protocol(h1, false), false, HttpVersion.HTTP_1_1, true),
Arguments.of(new Origin.Protocol(h1, false), true, HttpVersion.HTTP_1_1, false),
Arguments.of(new Origin.Protocol(h1, false), true, HttpVersion.HTTP_1_1, true),
// HTTP/1.1 Proxy with HTTP/2 Server.
Arguments.of(new Origin.Protocol(h1, false), false, HttpVersion.HTTP_2, false),
Arguments.of(new Origin.Protocol(h1, false), false, HttpVersion.HTTP_2, true),
Arguments.of(new Origin.Protocol(h1, false), true, HttpVersion.HTTP_2, false),
Arguments.of(new Origin.Protocol(h1, false), true, HttpVersion.HTTP_2, true),
// HTTP/2 Proxy with HTTP/1.1 Server.
Arguments.of(new Origin.Protocol(h2c, false), false, HttpVersion.HTTP_1_1, false),
Arguments.of(new Origin.Protocol(h2c, false), false, HttpVersion.HTTP_1_1, true),
Arguments.of(new Origin.Protocol(h2, false), true, HttpVersion.HTTP_1_1, false),
Arguments.of(new Origin.Protocol(h2, false), true, HttpVersion.HTTP_1_1, true),
Arguments.of(new Origin.Protocol(h2, true), true, HttpVersion.HTTP_1_1, false),
Arguments.of(new Origin.Protocol(h2, true), true, HttpVersion.HTTP_1_1, true),
// HTTP/2 Proxy with HTTP/2 Server.
Arguments.of(new Origin.Protocol(h2c, false), false, HttpVersion.HTTP_2, false),
Arguments.of(new Origin.Protocol(h2c, false), false, HttpVersion.HTTP_2, true),
Arguments.of(new Origin.Protocol(h2, false), true, HttpVersion.HTTP_2, false),
Arguments.of(new Origin.Protocol(h2, false), true, HttpVersion.HTTP_2, true),
Arguments.of(new Origin.Protocol(h2, true), true, HttpVersion.HTTP_2, false),
Arguments.of(new Origin.Protocol(h2, true), true, HttpVersion.HTTP_2, true)
);
}
@ParameterizedTest(name = "proxyProtocol={0}, proxySecure={1}, serverProtocol={2}, serverSecure={3}")
@MethodSource("testParams")
public void testProxy(Origin.Protocol proxyProtocol, boolean proxySecure, HttpVersion serverProtocol, boolean serverSecure) throws Exception
{
int status = HttpStatus.NO_CONTENT_204;
start(new Handler.Processor()
{
@Override
public void process(Request request, Response response, Callback callback)
{
response.setStatus(status);
callback.succeeded();
}
});
int proxyPort = proxySecure ? proxyTLSConnector.getLocalPort() : proxyConnector.getLocalPort();
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
HttpProxy proxy = new HttpProxy(proxyAddress, proxySecure, proxyProtocol);
client.getProxyConfiguration().getProxies().add(proxy);
String scheme = serverSecure ? "https" : "http";
int serverPort = serverSecure ? serverTLSConnector.getLocalPort() : serverConnector.getLocalPort();
ContentResponse response1 = client.newRequest("localhost", serverPort)
.scheme(scheme)
.version(serverProtocol)
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(status, response1.getStatus());
// Make a second request to be sure it went through the same connection.
ContentResponse response2 = client.newRequest("localhost", serverPort)
.scheme(scheme)
.version(serverProtocol)
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(status, response2.getStatus());
List<Destination> destinations = client.getDestinations().stream()
.filter(d -> d.getPort() == serverPort)
.toList();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(1, connectionPool.getConnectionCount());
}
@Test
public void testHTTP2TunnelClosedByClient() throws Exception
{
start(new EmptyServerHandler());
int proxyPort = proxyConnector.getLocalPort();
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
HttpProxy proxy = new HttpProxy(proxyAddress, false, new Origin.Protocol(List.of("h2c"), false));
client.getProxyConfiguration().getProxies().add(proxy);
long idleTimeout = 1000;
http2Client.setStreamIdleTimeout(idleTimeout);
String serverScheme = "http";
int serverPort = serverConnector.getLocalPort();
ContentResponse response = client.newRequest("localhost", serverPort)
.scheme(serverScheme)
.version(HttpVersion.HTTP_1_1)
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
// Client will close the HTTP2StreamEndPoint.
Thread.sleep(2 * idleTimeout);
List<Destination> destinations = client.getDestinations().stream()
.filter(d -> d.getPort() == serverPort)
.toList();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
List<HTTP2Connection> serverConnections = proxyConnector.getConnectedEndPoints().stream()
.map(EndPoint::getConnection)
.map(HTTP2Connection.class::cast)
.toList();
assertEquals(1, serverConnections.size());
assertTrue(serverConnections.get(0).getSession().getStreams().isEmpty());
}
@Test
public void testProxyDown() throws Exception
{
start(new EmptyServerHandler());
int proxyPort = proxyConnector.getLocalPort();
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
HttpProxy httpProxy = new HttpProxy(proxyAddress, false, new Origin.Protocol(List.of("h2c"), false));
client.getProxyConfiguration().getProxies().add(httpProxy);
proxy.stop();
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.version(HttpVersion.HTTP_1_1)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertTrue(result.isFailed());
assertThat(result.getFailure(), Matchers.instanceOf(ConnectException.class));
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testHTTP2TunnelHardClosedByProxy() throws Exception
{
startServer(new EmptyServerHandler());
CountDownLatch closeLatch = new CountDownLatch(1);
startProxy(new ConnectHandler()
{
@Override
protected void handleConnect(Request request, Response response, Callback callback, String serverAddress)
{
request.getConnectionMetaData().getConnection().getEndPoint().close();
closeLatch.countDown();
}
});
startClient();
int proxyPort = proxyConnector.getLocalPort();
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
HttpProxy httpProxy = new HttpProxy(proxyAddress, false, new Origin.Protocol(List.of("h2c"), false));
client.getProxyConfiguration().getProxies().add(httpProxy);
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.version(HttpVersion.HTTP_1_1)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertTrue(result.isFailed());
assertThat(result.getFailure(), Matchers.instanceOf(ClosedChannelException.class));
latch.countDown();
});
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(latch.await(5, TimeUnit.SECONDS));
List<Destination> destinations = client.getDestinations().stream()
.filter(d -> d.getPort() == proxyPort)
.toList();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
}
@Test
public void testHTTP2TunnelResetByClient() throws Exception
{
startServer(new EmptyServerHandler());
CountDownLatch closeLatch = new CountDownLatch(2);
startProxy(new ConnectHandler()
{
@Override
protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context)
{
return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context)
{
@Override
protected void close(Throwable failure)
{
super.close(failure);
closeLatch.countDown();
}
};
}
@Override
protected UpstreamConnection newUpstreamConnection(EndPoint endPoint, ConnectContext connectContext)
{
return new UpstreamConnection(endPoint, getExecutor(), getByteBufferPool(), connectContext)
{
@Override
protected void close(Throwable failure)
{
super.close(failure);
closeLatch.countDown();
}
};
}
});
startClient();
FuturePromise<Session> sessionPromise = new FuturePromise<>();
http2Client.connect(new InetSocketAddress("localhost", proxyConnector.getLocalPort()), new Session.Listener() {}, sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);
String serverAddress = "localhost:" + serverConnector.getLocalPort();
MetaData.ConnectRequest connect = new MetaData.ConnectRequest(HttpScheme.HTTP, new HostPortHttpField(serverAddress), null, HttpFields.EMPTY, null);
HeadersFrame frame = new HeadersFrame(connect, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
CountDownLatch tunnelLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.OK_200)
tunnelLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
String response = BufferUtil.toString(data.frame().getData(), StandardCharsets.UTF_8);
data.release();
if (response.startsWith("HTTP/1.1 200"))
responseLatch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
assertTrue(tunnelLatch.await(5, TimeUnit.SECONDS));
// Tunnel is established, send a HTTP/1.1 request.
String h1 = "GET / HTTP/1.1\r\n" +
"Host: " + serverAddress + "\r\n" +
"\r\n";
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(h1.getBytes(StandardCharsets.UTF_8)), false), Callback.NOOP);
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
// Now reset the stream, tunnel must be closed.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testHTTP2TunnelProxyStreamTimeout() throws Exception
{
startServer(new EmptyServerHandler());
CountDownLatch closeLatch = new CountDownLatch(2);
startProxy(new ConnectHandler()
{
@Override
protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context)
{
return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context)
{
@Override
protected void close(Throwable failure)
{
super.close(failure);
closeLatch.countDown();
}
};
}
@Override
protected UpstreamConnection newUpstreamConnection(EndPoint endPoint, ConnectContext connectContext)
{
return new UpstreamConnection(endPoint, getExecutor(), getByteBufferPool(), connectContext)
{
@Override
protected void close(Throwable failure)
{
super.close(failure);
closeLatch.countDown();
}
};
}
});
startClient();
long streamIdleTimeout = 1000;
ConnectionFactory h2c = proxyConnector.getConnectionFactory("h2c");
((HTTP2CServerConnectionFactory)h2c).setStreamIdleTimeout(streamIdleTimeout);
FuturePromise<Session> sessionPromise = new FuturePromise<>();
http2Client.connect(new InetSocketAddress("localhost", proxyConnector.getLocalPort()), new Session.Listener() {}, sessionPromise);
Session session = sessionPromise.get(5, TimeUnit.SECONDS);
String serverAddress = "localhost:" + serverConnector.getLocalPort();
MetaData.ConnectRequest connect = new MetaData.ConnectRequest(HttpScheme.HTTP, new HostPortHttpField(serverAddress), null, HttpFields.EMPTY, null);
HeadersFrame frame = new HeadersFrame(connect, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
CountDownLatch tunnelLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.OK_200)
tunnelLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
String response = BufferUtil.toString(data.frame().getData(), StandardCharsets.UTF_8);
data.release();
if (response.startsWith("HTTP/1.1 200"))
responseLatch.countDown();
}
@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
resetLatch.countDown();
callback.succeeded();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
assertTrue(tunnelLatch.await(5, TimeUnit.SECONDS));
// Tunnel is established, send a HTTP/1.1 request.
String h1 = "GET / HTTP/1.1\r\n" +
"Host: " + serverAddress + "\r\n" +
"\r\n";
stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(h1.getBytes(StandardCharsets.UTF_8)), false), Callback.NOOP);
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
// Wait until the proxy stream idle times out.
Thread.sleep(2 * streamIdleTimeout);
// Client should see a RST_STREAM.
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Tunnel must be closed.
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
private static class EmptyServerHandler extends Handler.Processor
{
@Override
public void process(Request request, Response response, Callback callback)
{
callback.succeeded();
}
}
}

View File

@ -26,10 +26,11 @@ import org.eclipse.jetty.util.Callback;
public interface HttpStream extends Callback
{
/**
* Attribute used to get the {@link Connection} from the request attributes. This should not be used to set the
* connection as a request attribute, instead use {@link HttpStream#setUpgradeConnection(Connection)}.
* <p>Attribute name to be used as a {@link Request} attribute to store/retrieve
* the {@link Connection} created during the HTTP/1.1 upgrade mechanism or the
* HTTP/2 tunnel mechanism.</p>
*/
String UPGRADE_CONNECTION_ATTRIBUTE = HttpStream.class.getName() + ".UPGRADE";
String UPGRADE_CONNECTION_ATTRIBUTE = HttpStream.class.getName() + ".upgradeConnection";
/**
* @return an ID unique within the lifetime scope of the associated protocol connection.
@ -66,13 +67,6 @@ public interface HttpStream extends Callback
boolean isCommitted();
// TODO: remove this method? Only used in tests.
boolean isComplete();
void setUpgradeConnection(Connection connection);
Connection upgrade();
default TunnelSupport getTunnelSupport()
{
return null;
@ -169,24 +163,6 @@ public interface HttpStream extends Callback
return getWrapped().isCommitted();
}
@Override
public final boolean isComplete()
{
return getWrapped().isComplete();
}
@Override
public void setUpgradeConnection(Connection connection)
{
getWrapped().setUpgradeConnection(connection);
}
@Override
public Connection upgrade()
{
return getWrapped().upgrade();
}
@Override
public TunnelSupport getTunnelSupport()
{

View File

@ -205,6 +205,10 @@ public class ConnectHandler extends Handler.Wrapper
};
}
}
else
{
return (req, res, cbk) -> Response.writeError(req, res, cbk, HttpStatus.NOT_IMPLEMENTED_501);
}
}
return super.handle(request);
}

View File

@ -1370,7 +1370,7 @@ public class HttpChannelState implements HttpChannel, Components
// is the request fully consumed?
Throwable unconsumed = stream.consumeAvailable();
if (LOG.isDebugEnabled())
LOG.debug("consumeAll: {} {} ", unconsumed == null, httpChannelState);
LOG.debug("consumeAvailable: {} {} ", unconsumed == null, httpChannelState);
if (unconsumed != null && httpChannelState.getConnectionMetaData().isPersistent())
{
@ -1536,6 +1536,7 @@ public class HttpChannelState implements HttpChannel, Components
}
if (needLastWrite)
{
_stream.send(_request._metaData, responseMetaData, true, null,
Callback.from(() -> httpChannel._handlerInvoker.failed(_failure),
x ->
@ -1544,8 +1545,11 @@ public class HttpChannelState implements HttpChannel, Components
_failure.addSuppressed(x);
httpChannel._handlerInvoker.failed(_failure);
}));
}
else
{
httpChannel._handlerInvoker.failed(_failure);
}
}
@Override

View File

@ -598,31 +598,19 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
return handle;
}
private boolean upgrade(HttpStream stream)
private boolean upgrade(HttpStreamOverHTTP1 stream)
{
Connection connection = stream.upgrade();
if (connection == null)
return false;
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection);
getEndPoint().upgrade(connection);
//_channel.recycle(); // TODO should something be done to the channel?
_parser.reset();
_generator.reset();
if (_retainableByteBuffer != null)
if (stream.upgrade())
{
if (!_retainableByteBuffer.isRetained())
{
releaseRequestBuffer();
}
else
{
LOG.warn("{} lingering content references?!?!", this);
_retainableByteBuffer = null; // Not returned to pool!
}
_httpChannel.recycle();
_parser.close();
_generator.reset();
return true;
}
else
{
return false;
}
return true;
}
@Override
@ -647,22 +635,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
{
// TODO: do we really need to do this?
// This event is fired really late, sendCallback should already be failed at this point.
try
{
if (cause == null)
_sendCallback.close();
else
_sendCallback.failed(cause);
}
finally
{
if (cause != null)
{
Runnable todo = _httpChannel.onFailure(cause);
if (todo != null)
todo.run();
}
}
if (cause == null)
_sendCallback.close();
else
_sendCallback.failed(cause);
super.onClose(cause);
}
@ -970,10 +946,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
{
private Throwable _failure;
protected RequestHandler()
{
}
@Override
public void startRequest(String method, String uri, HttpVersion version)
{
@ -1068,9 +1040,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
_httpChannel.onRequest(new MetaData.Request(stream._method, uri, stream._version, HttpFields.EMPTY));
}
Runnable todo = _httpChannel.onFailure(failure);
if (todo != null)
getServer().getThreadPool().execute(todo);
Runnable task = _httpChannel.onFailure(failure);
if (task != null)
getServer().getThreadPool().execute(task);
}
@Override
@ -1156,7 +1128,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
private HostPortHttpField _hostField;
private MetaData.Request _request;
private HttpField _upgrade = null;
private Connection _upgradeConnection;
private Content.Chunk _chunk;
private boolean _connectionClose = false;
private boolean _connectionKeepAlive = false;
@ -1250,7 +1221,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
if (_uri.isAbsolute())
{
if (!_hostField.getValue().equals(_uri.getAuthority()))
throw new BadMessageException("Authority!=Host ");
throw new BadMessageException("Authority!=Host");
}
else
{
@ -1306,8 +1277,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
!_connectionClose ||
HttpMethod.CONNECT.is(_method);
// Since persistent status is now exposed in the application API, we need to be more definitive earlier
// if we are persistent or not.
_generator.setPersistent(persistent);
if (!persistent)
_connectionKeepAlive = false;
@ -1320,33 +1289,37 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
if (_unknownExpectation)
{
_requestHandler.badMessage(new BadMessageException(HttpStatus.EXPECTATION_FAILED_417));
return null; // TODO Is this enough ???
return null;
}
persistent = getHttpConfiguration().isPersistentConnectionsEnabled() &&
!_connectionClose ||
HttpMethod.CONNECT.is(_method);
// Since persistent status is now exposed in the application API, we need to be more definitive earlier
// if we are persistent or not.
_generator.setPersistent(persistent);
// Try to upgrade before calling the application.
// In case of WebSocket, it is the application that performs the upgrade
// so upgrade(stream) will return false, and the upgrade will be finished
// in HttpStreamOverHTTP1.succeeded().
// In case of HTTP/2, the application is not called and the upgrade
// is finished here by upgrade(stream) which will return true.
if (_upgrade != null && HttpConnection.this.upgrade(_stream.get()))
return null; // TODO do we need to return a runnable to complete the upgrade ???
return null;
break;
}
case HTTP_2:
{
// Allow direct "upgrade" to HTTP_2_0 only if the connector supports h2c.
// Allow prior knowledge "upgrade" to HTTP/2 only if the connector supports h2c.
_upgrade = PREAMBLE_UPGRADE_H2C;
if (HttpMethod.PRI.is(_method) &&
"*".equals(_uri.getPath()) &&
_headerBuilder.size() == 0 &&
HttpConnection.this.upgrade(_stream.get()))
return null; // TODO do we need to return a runnable to complete the upgrade ???
return null;
// TODO is this sufficient?
_parser.close();
@ -1486,40 +1459,21 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
return _stream.get() != this || _generator.isCommitted();
}
@Override
public boolean isComplete()
{
return _stream.get() != this;
}
@Override
public void setUpgradeConnection(Connection connection)
{
_upgradeConnection = connection;
if (_httpChannel.getRequest() != null)
_httpChannel.getRequest().setAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE, connection);
}
@Override
public Connection upgrade()
private boolean upgrade()
{
if (LOG.isDebugEnabled())
LOG.debug("upgrade {} {}", this, _upgrade);
// If Upgrade attribute already set then we don't need to do anything here.
if (_upgradeConnection != null)
return _upgradeConnection;
// If no upgrade headers there is nothing to do.
if (!_connectionUpgrade && (_upgrade == null))
return null;
// If no upgrade headers then there is nothing to do.
if (!_connectionUpgrade && _upgrade == null)
return false;
@SuppressWarnings("ReferenceEquality")
boolean isUpgradedH2C = (_upgrade == PREAMBLE_UPGRADE_H2C);
if (!isUpgradedH2C && !_connectionUpgrade)
boolean isPriorKnowledgeH2C = _upgrade == PREAMBLE_UPGRADE_H2C;
if (!isPriorKnowledgeH2C && !_connectionUpgrade)
throw new BadMessageException(HttpStatus.BAD_REQUEST_400);
// Find the upgrade factory
// Find the upgrade factory.
ConnectionFactory.Upgrading factory = getConnector().getConnectionFactories().stream()
.filter(f -> f instanceof ConnectionFactory.Upgrading)
.map(ConnectionFactory.Upgrading.class::cast)
@ -1531,27 +1485,28 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
{
if (LOG.isDebugEnabled())
LOG.debug("No factory for {} in {}", _upgrade, getConnector());
return null;
return false;
}
// Create new connection
HttpFields.Mutable response101 = HttpFields.build();
Connection upgradeConnection = factory.upgradeConnection(getConnector(), getEndPoint(), _request, response101);
if (upgradeConnection == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Upgrade ignored for {} by {}", _upgrade, factory);
return null;
return false;
}
// Send 101 if needed
if (!isUpgradedH2C)
// Prior knowledge HTTP/2 does not need a 101 response (it will directly be
// in HTTP/2 format) while HTTP/1.1 to HTTP/2 upgrade needs a 101 response.
if (!isPriorKnowledgeH2C)
send(_request, new MetaData.Response(HttpVersion.HTTP_1_1, HttpStatus.SWITCHING_PROTOCOLS_101, response101, 0), false, null, Callback.NOOP);
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", getEndPoint().getConnection(), upgradeConnection);
//getHttpTransport().onCompleted(); // TODO: succeed callback instead?
return upgradeConnection;
LOG.debug("Upgrading from {} to {}", getEndPoint().getConnection(), upgradeConnection);
getEndPoint().upgrade(upgradeConnection);
return true;
}
@Override
@ -1573,18 +1528,22 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
if (isFillInterested())
{
LOG.warn("Read pending {} {}", this, getEndPoint());
failed(new IOException("Pending read in onCompleted"));
abort(new IOException("Pending read in onCompleted"));
return;
}
// Save the upgrade Connection before recycling the HttpChannel which would clear the request attributes.
_upgradeConnection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE);
Connection upgradeConnection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE);
if (upgradeConnection != null)
{
getEndPoint().upgrade(upgradeConnection);
_httpChannel.recycle();
_parser.close();
_generator.reset();
return;
}
_httpChannel.recycle();
if (HttpConnection.this.upgrade(stream))
return;
if (_expects100Continue)
{
// No content was read, and no 100 Continue response was sent.
@ -1658,8 +1617,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
LOG.debug("ignored", x);
return;
}
abort(x);
}
getEndPoint().close();
private void abort(Throwable failure)
{
getEndPoint().close(failure);
}
@Override

View File

@ -506,15 +506,17 @@ public class CustomRequestLogTest
{
testHandlerServerStart("%U ConnectionStatus: %s %X");
_connector.getResponse("GET /one HTTP/1.0\n\n");
String response = _connector.getResponse("GET /one HTTP/1.0\n\n");
assertThat(response, containsString("200 OK"));
assertThat(_entries.poll(5, TimeUnit.SECONDS), is("/one ConnectionStatus: 200 -"));
_connector.getResponse("""
response = _connector.getResponse("""
GET /two HTTP/1.1
Host: localhost
Connection: close
""");
assertThat(response, containsString("200 OK"));
assertThat(_entries.poll(5, TimeUnit.SECONDS), is("/two ConnectionStatus: 200 -"));
LocalConnector.LocalEndPoint connect = _connector.connect();
@ -541,19 +543,19 @@ public class CustomRequestLogTest
assertThat(_entries.poll(5, TimeUnit.SECONDS), is("/four ConnectionStatus: 200 +"));
assertThat(_entries.poll(5, TimeUnit.SECONDS), is("/five ConnectionStatus: 200 -"));
_connector.getResponse("""
response = _connector.getResponse("""
GET /no/host HTTP/1.1
""");
connect.getResponse();
assertThat(response, containsString(" 400 "));
assertThat(_entries.poll(5, TimeUnit.SECONDS), is("/no/host ConnectionStatus: 400 X"));
_connector.getResponse("""
response = _connector.getResponse("""
GET /abort HTTP/1.1
Host: localhost
""");
connect.getResponse();
assertThat(response, containsString("200 OK"));
assertThat(_entries.poll(5, TimeUnit.SECONDS), is("/abort ConnectionStatus: 200 X"));
}

View File

@ -33,7 +33,6 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.Trailers;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.logging.StacklessLogging;
@ -938,13 +937,6 @@ public class HttpChannelTest
super.push(request);
}
@Override
public Connection upgrade()
{
history.add("upgrade");
return super.upgrade();
}
@Override
public void succeeded()
{

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -224,7 +223,6 @@ public class MockHttpStream implements HttpStream
return response != null && response.getStatus() >= 200;
}
@Override
public boolean isComplete()
{
return _completed.getCount() == 0;
@ -258,16 +256,4 @@ public class MockHttpStream implements HttpStream
if (_complete.compareAndSet(null, x == null ? new Throwable() : x))
_completed.countDown();
}
@Override
public void setUpgradeConnection(Connection connection)
{
throw new UnsupportedOperationException();
}
@Override
public Connection upgrade()
{
throw new UnsupportedOperationException();
}
}

View File

@ -134,18 +134,7 @@ public abstract class AbstractHandshaker implements Handshaker
if (httpConfig.getSendServerVersion())
response.getHeaders().put(SERVER_VERSION);
// We need to also manually set upgrade attribute because stream wrapper succeeded is run after
// the decision is made to close the connection.
request.setAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE, connection);
request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
{
@Override
public void succeeded()
{
setUpgradeConnection(connection);
super.succeeded();
}
});
negotiation.getRequest().upgrade(request);

View File

@ -19,10 +19,10 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.ConnectionMetaData;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.TunnelSupport;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
@ -57,13 +57,13 @@ public class RFC8441Handshaker extends AbstractHandshaker
}
@Override
protected WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession)
protected WebSocketConnection createWebSocketConnection(Request request, WebSocketCoreSession coreSession)
{
ConnectionMetaData connectionMetaData = baseRequest.getConnectionMetaData();
Connector connector = connectionMetaData.getConnector();
EndPoint endPoint = null; // TODO: connectionMetaData.getTunnellingEndPoint();
Connector connector = request.getConnectionMetaData().getConnector();
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
TunnelSupport tunnelSupport = request.getTunnelSupport();
EndPoint endPoint = tunnelSupport.getEndPoint();
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}

View File

@ -14,9 +14,9 @@
package org.eclipse.jetty.websocket.core.server.internal;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.TunnelSupport;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
@ -30,9 +30,9 @@ public class RFC8441Negotiation extends WebSocketNegotiation
@Override
public boolean validateHeaders()
{
MetaData.Request metaData = null; // TODO: getRequest().getMetaData();
if (metaData == null)
TunnelSupport tunnelSupport = getRequest().getTunnelSupport();
if (tunnelSupport == null)
return false;
return "websocket".equals(metaData.getProtocol());
return "websocket".equals(tunnelSupport.getProtocol());
}
}

View File

@ -17,10 +17,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EventListener;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.RequestDispatcher;
@ -56,7 +61,6 @@ import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKIN
*/
public class ServletChannel implements Runnable
{
public static Listener NOOP_LISTENER = new Listener() {};
private static final Logger LOG = LoggerFactory.getLogger(ServletChannel.class);
private final AtomicLong _requests = new AtomicLong();
@ -67,11 +71,10 @@ public class ServletChannel implements Runnable
private ServletRequestState _state;
private ServletContextHandler.ServletContextApi _servletContextApi;
private ServletContextRequest _request;
private boolean _expects100Continue;
private Listener _combinedListener;
private long _oldIdleTimeout;
private Callback _callback;
private boolean _expects100Continue;
// TODO:
private final Listener _combinedListener = NOOP_LISTENER;
// Bytes written after interception (e.g. after compression).
private long _written;
@ -84,15 +87,16 @@ public class ServletChannel implements Runnable
public void init(ServletContextRequest request)
{
_connector = request.getConnectionMetaData().getConnector();
_executor = request.getContext();
_configuration = request.getConnectionMetaData().getHttpConfiguration();
_endPoint = request.getConnectionMetaData().getConnection().getEndPoint();
_state = new ServletRequestState(this); // TODO can this be recycled?
_servletContextApi = request.getContext().getServletContext();
_request = request;
_executor = request.getContext();
_state = new ServletRequestState(this); // TODO can this be recycled?
_endPoint = request.getConnectionMetaData().getConnection().getEndPoint();
_connector = request.getConnectionMetaData().getConnector();
// TODO: can we do this?
_configuration = request.getConnectionMetaData().getHttpConfiguration();
_expects100Continue = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
_combinedListener = new Listeners(request);
request.getHttpInput().init();
if (LOG.isDebugEnabled())
@ -559,6 +563,7 @@ public class ServletChannel implements Runnable
}
// If send error is called we need to break.
// TODO: is this necessary? It always returns false.
if (checkAndPrepareUpgrade())
break;
@ -867,8 +872,6 @@ public class ServletChannel implements Runnable
*/
public interface Listener extends EventListener
{
// TODO do we need this class?
/**
* Invoked just after the HTTP request line and headers have been parsed.
*
@ -1012,4 +1015,149 @@ public class ServletChannel implements Runnable
{
}
}
private class Listeners implements Listener
{
private final List<Listener> _listeners;
private Listeners(ServletContextRequest request)
{
Collection<Listener> connectorListeners = request.getConnectionMetaData().getConnector().getBeans(Listener.class);
List<Listener> handlerListeners = request.getContext().getServletContextHandler().getEventListeners().stream()
.filter(l -> l instanceof Listener)
.map(Listener.class::cast)
.toList();
_listeners = new ArrayList<>(connectorListeners);
_listeners.addAll(handlerListeners);
}
@Override
public void onRequestBegin(Request request)
{
_listeners.forEach(l -> notify(l::onRequestBegin, request));
}
@Override
public void onBeforeDispatch(Request request)
{
_listeners.forEach(l -> notify(l::onBeforeDispatch, request));
}
@Override
public void onDispatchFailure(Request request, Throwable failure)
{
_listeners.forEach(l -> notify(l::onDispatchFailure, request, failure));
}
@Override
public void onAfterDispatch(Request request)
{
_listeners.forEach(l -> notify(l::onAfterDispatch, request));
}
@Override
public void onRequestContent(Request request, ByteBuffer content)
{
_listeners.forEach(l -> notify(l::onRequestContent, request, content));
}
@Override
public void onRequestContentEnd(Request request)
{
_listeners.forEach(l -> notify(l::onRequestContentEnd, request));
}
@Override
public void onRequestTrailers(Request request)
{
_listeners.forEach(l -> notify(l::onRequestTrailers, request));
}
@Override
public void onRequestEnd(Request request)
{
_listeners.forEach(l -> notify(l::onRequestEnd, request));
}
@Override
public void onRequestFailure(Request request, Throwable failure)
{
_listeners.forEach(l -> notify(l::onDispatchFailure, request, failure));
}
@Override
public void onResponseBegin(Request request)
{
_listeners.forEach(l -> notify(l::onResponseBegin, request));
}
@Override
public void onResponseCommit(Request request)
{
_listeners.forEach(l -> notify(l::onResponseCommit, request));
}
@Override
public void onResponseContent(Request request, ByteBuffer content)
{
_listeners.forEach(l -> notify(l::onRequestContent, request, content));
}
@Override
public void onResponseEnd(Request request)
{
_listeners.forEach(l -> notify(l::onResponseEnd, request));
}
@Override
public void onResponseFailure(Request request, Throwable failure)
{
_listeners.forEach(l -> notify(l::onDispatchFailure, request, failure));
}
@Override
public void onComplete(Request request)
{
_listeners.forEach(l -> notify(l::onComplete, request));
}
private void notify(Consumer<Request> consumer, Request request)
{
try
{
consumer.accept(request);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failure while notifying %s event for %s".formatted(ServletChannel.Listener.class.getSimpleName(), request));
}
}
private void notify(BiConsumer<Request, Throwable> consumer, Request request, Throwable failure)
{
try
{
consumer.accept(request, failure);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failure while notifying %s event for %s".formatted(ServletChannel.Listener.class.getSimpleName(), request));
}
}
private void notify(BiConsumer<Request, ByteBuffer> consumer, Request request, ByteBuffer byteBuffer)
{
try
{
consumer.accept(request, byteBuffer.slice());
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failure while notifying %s event for %s".formatted(ServletChannel.Listener.class.getSimpleName(), request));
}
}
}
}

View File

@ -203,7 +203,7 @@ public class ServletContextHandler extends ContextHandler implements Graceful
private final List<EventListener> _programmaticListeners = new CopyOnWriteArrayList<>();
private final List<ServletContextListener> _servletContextListeners = new CopyOnWriteArrayList<>();
private final List<ServletContextListener> _destroyServletContextListeners = new ArrayList<>();
protected final List<ServletContextAttributeListener> _servletContextAttributeListeners = new CopyOnWriteArrayList<>();
private final List<ServletContextAttributeListener> _servletContextAttributeListeners = new CopyOnWriteArrayList<>();
private final List<ServletRequestListener> _servletRequestListeners = new CopyOnWriteArrayList<>();
private final List<ServletRequestAttributeListener> _servletRequestAttributeListeners = new CopyOnWriteArrayList<>();
private final List<ServletContextScopeListener> _contextListeners = new CopyOnWriteArrayList<>();

View File

@ -58,12 +58,12 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncServletTest
{
protected AsyncServlet _servlet = new AsyncServlet();
protected int _port;
protected Server _server = new Server();
protected ServletHandler _servletHandler;
protected ErrorPageErrorHandler _errorHandler;
@ -71,13 +71,13 @@ public class AsyncServletTest
protected List<String> _log;
protected int _expectedLogs;
protected String _expectedCode;
protected static List<String> __history = new CopyOnWriteArrayList<>();
protected static CountDownLatch __latch;
protected List<String> _history = new CopyOnWriteArrayList<>();
protected CountDownLatch _latch;
static void historyAdd(String item)
private void historyAdd(String item)
{
// System.err.println(Thread.currentThread()+" history: "+item);
__history.add(item);
_history.add(item);
}
@BeforeEach
@ -115,15 +115,15 @@ public class AsyncServletTest
_servletHandler.addServletWithMapping(holder2, "/noasync/*");
_server.start();
_port = _connector.getLocalPort();
__history.clear();
__latch = new CountDownLatch(1);
_history.clear();
_latch = new CountDownLatch(1);
context.addEventListener(new ServletChannel.Listener()
{
@Override
public void onComplete(Request request)
{
__latch.countDown();
_latch.countDown();
}
});
}
@ -141,12 +141,12 @@ public class AsyncServletTest
{
String response = process(null, null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info",
"initial"));
assertContains("NORMAL", response);
assertFalse(__history.contains("onTimeout"));
assertFalse(__history.contains("onComplete"));
assertFalse(_history.contains("onTimeout"));
assertFalse(_history.contains("onComplete"));
}
@Test
@ -154,12 +154,12 @@ public class AsyncServletTest
{
String response = process("sleep=200", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?sleep=200",
"initial"));
assertContains("SLEPT", response);
assertFalse(__history.contains("onTimeout"));
assertFalse(__history.contains("onComplete"));
assertFalse(_history.contains("onTimeout"));
assertFalse(_history.contains("onComplete"));
}
@Test
@ -167,7 +167,7 @@ public class AsyncServletTest
{
String response = process(null, null);
assertThat(response, Matchers.startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info",
"initial"));
@ -180,7 +180,7 @@ public class AsyncServletTest
_expectedCode = "200 ";
String response = process("noasync", null, null);
assertThat(response, Matchers.startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/noasync/info",
"wrapped REQ",
"initial"
@ -192,12 +192,12 @@ public class AsyncServletTest
@Test
public void testAsyncNotSupportedAsync() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(ServletChannel.class))
try (StacklessLogging ignored = new StacklessLogging(ServletChannel.class))
{
_expectedCode = "500 ";
String response = process("noasync", "start=200", null);
assertThat(response, Matchers.startsWith("HTTP/1.1 500 "));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/noasync/info?start=200",
"wrapped REQ",
"initial",
@ -217,7 +217,7 @@ public class AsyncServletTest
_expectedCode = "500 ";
String response = process("start=200", null);
assertThat(response, Matchers.startsWith("HTTP/1.1 500 Server Error"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200",
"initial",
"start",
@ -235,7 +235,7 @@ public class AsyncServletTest
{
String response = process("start=200&timeout=dispatch", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&timeout=dispatch",
"initial",
"start",
@ -255,7 +255,7 @@ public class AsyncServletTest
_expectedCode = "500 ";
String response = process("start=200&timeout=error", null);
assertThat(response, startsWith("HTTP/1.1 500 Server Error"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&timeout=error",
"initial",
"start",
@ -274,7 +274,7 @@ public class AsyncServletTest
{
String response = process("start=200&timeout=complete", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&timeout=complete",
"initial",
"start",
@ -290,7 +290,7 @@ public class AsyncServletTest
{
String response = process("start=200&dispatch=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&dispatch=10",
"initial",
"start",
@ -299,7 +299,7 @@ public class AsyncServletTest
"wrapped REQ",
"!initial",
"onComplete"));
assertFalse(__history.contains("onTimeout"));
assertFalse(_history.contains("onTimeout"));
}
@Test
@ -307,7 +307,7 @@ public class AsyncServletTest
{
String response = process("start=200&dispatch=0", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&dispatch=0",
"initial",
"start",
@ -324,7 +324,7 @@ public class AsyncServletTest
_expectedCode = "500 ";
String response = process("start=200&throw=1", null);
assertThat(response, startsWith("HTTP/1.1 500 Server Error"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&throw=1",
"initial",
"start",
@ -341,15 +341,15 @@ public class AsyncServletTest
{
String response = process("start=200&complete=50", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&complete=50",
"initial",
"start",
"complete",
"onComplete"));
assertContains("COMPLETED", response);
assertFalse(__history.contains("onTimeout"));
assertFalse(__history.contains("!initial"));
assertFalse(_history.contains("onTimeout"));
assertFalse(_history.contains("!initial"));
}
@Test
@ -357,15 +357,15 @@ public class AsyncServletTest
{
String response = process("start=200&complete=0", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&complete=0",
"initial",
"start",
"complete",
"onComplete"));
assertContains("COMPLETED", response);
assertFalse(__history.contains("onTimeout"));
assertFalse(__history.contains("!initial"));
assertFalse(_history.contains("onTimeout"));
assertFalse(_history.contains("!initial"));
}
@Test
@ -373,7 +373,7 @@ public class AsyncServletTest
{
String response = process("start=1000&dispatch=10&start2=1000&dispatch2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=1000&dispatch=10&start2=1000&dispatch2=10",
"initial",
"start",
@ -396,7 +396,7 @@ public class AsyncServletTest
{
String response = process("start=1000&dispatch=10&start2=1000&complete2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=1000&dispatch=10&start2=1000&complete2=10",
"initial",
"start",
@ -417,7 +417,7 @@ public class AsyncServletTest
_expectedCode = "500 ";
String response = process("start=1000&dispatch=10&start2=10", null);
assertThat(response, startsWith("HTTP/1.1 500 Server Error"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=1000&dispatch=10&start2=10",
"initial",
"start",
@ -440,7 +440,7 @@ public class AsyncServletTest
{
String response = process("start=10&start2=1000&dispatch2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=10&start2=1000&dispatch2=10",
"initial",
"start",
@ -463,7 +463,7 @@ public class AsyncServletTest
{
String response = process("start=10&start2=1000&complete2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=10&start2=1000&complete2=10",
"initial",
"start",
@ -485,7 +485,7 @@ public class AsyncServletTest
_errorHandler.addErrorPage(500, "/path/error");
String response = process("start=10&start2=10", null);
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=10&start2=10",
"initial",
"start",
@ -508,7 +508,7 @@ public class AsyncServletTest
{
String response = process("wrap=true&start=200&dispatch=20", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?wrap=true&start=200&dispatch=20",
"initial",
"start",
@ -525,7 +525,7 @@ public class AsyncServletTest
{
String response = process("start=200&dispatch=20&path=/p%20th3", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=200&dispatch=20&path=/p%20th3",
"initial",
"start",
@ -542,7 +542,7 @@ public class AsyncServletTest
{
String response = process("fwd", "start=200&dispatch=20", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"FWD REQUEST /ctx/fwd/info?start=200&dispatch=20",
"FORWARD /ctx/path1?forward=true",
"wrapped REQ",
@ -563,7 +563,7 @@ public class AsyncServletTest
{
String response = process("fwd", "start=200&dispatch=20&path=/path2", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"FWD REQUEST /ctx/fwd/info?start=200&dispatch=20&path=/path2",
"FORWARD /ctx/path1?forward=true",
"wrapped REQ",
@ -582,7 +582,7 @@ public class AsyncServletTest
{
String response = process("fwd", "wrap=true&start=200&dispatch=20", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"FWD REQUEST /ctx/fwd/info?wrap=true&start=200&dispatch=20",
"FORWARD /ctx/path1?forward=true",
"wrapped REQ",
@ -601,7 +601,7 @@ public class AsyncServletTest
{
String response = process("fwd", "wrap=true&start=200&dispatch=20&path=/path2", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"FWD REQUEST /ctx/fwd/info?wrap=true&start=200&dispatch=20&path=/path2",
"FORWARD /ctx/path1?forward=true",
"wrapped REQ",
@ -618,11 +618,13 @@ public class AsyncServletTest
@Test
public void testAsyncRead() throws Exception
{
String header = "GET /ctx/path/info?start=2000&dispatch=1500 HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"Connection: close\r\n" +
"\r\n";
String header = """
GET /ctx/path/info?start=2000&dispatch=1500 HTTP/1.1\r
Host: localhost\r
Content-Length: 10\r
Connection: close\r
\r
""";
String body = "12345678\r\n";
try (Socket socket = new Socket("localhost", _port))
@ -634,9 +636,9 @@ public class AsyncServletTest
socket.getOutputStream().write(body.getBytes(StandardCharsets.ISO_8859_1), 2, 8);
String response = IO.toString(socket.getInputStream());
__latch.await(1, TimeUnit.SECONDS);
assertTrue(_latch.await(1, TimeUnit.SECONDS));
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(__history, contains(
assertThat(_history, contains(
"REQUEST /ctx/path/info?start=2000&dispatch=1500",
"initial",
"start",
@ -678,7 +680,7 @@ public class AsyncServletTest
socket.getOutputStream().write(request.getBytes(StandardCharsets.UTF_8));
socket.getOutputStream().flush();
String response = IO.toString(socket.getInputStream());
__latch.await(1, TimeUnit.SECONDS);
assertTrue(_latch.await(1, TimeUnit.SECONDS));
return response;
}
catch (Exception e)
@ -694,12 +696,7 @@ public class AsyncServletTest
assertThat(response, Matchers.containsString(content));
}
protected void assertNotContains(String content, String response)
{
assertThat(response, Matchers.not(Matchers.containsString(content)));
}
private static class FwdServlet extends HttpServlet
private class FwdServlet extends HttpServlet
{
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
@ -711,9 +708,8 @@ public class AsyncServletTest
}
}
private static class AsyncServlet extends HttpServlet
private class AsyncServlet extends HttpServlet
{
private static final long serialVersionUID = -8161977157098646562L;
private final Timer _timer = new Timer();
@Override
@ -782,29 +778,25 @@ public class AsyncServletTest
}
else if (request.getContentLength() > 0)
{
new Thread()
new Thread(() ->
{
@Override
public void run()
int c = 0;
try
{
int c = 0;
try
InputStream in = request.getInputStream();
int b = 0;
while (b != -1)
{
InputStream in = request.getInputStream();
int b = 0;
while (b != -1)
{
if ((b = in.read()) >= 0)
c++;
}
historyAdd("async-read=" + c);
}
catch (Exception e)
{
e.printStackTrace();
if ((b = in.read()) >= 0)
c++;
}
historyAdd("async-read=" + c);
}
}.start();
catch (Exception e)
{
e.printStackTrace();
}
}).start();
}
if (startFor >= 0)
@ -812,7 +804,7 @@ public class AsyncServletTest
final AsyncContext async = wrap ? request.startAsync(new HttpServletRequestWrapper(request), new HttpServletResponseWrapper(response)) : request.startAsync();
if (startFor > 0)
async.setTimeout(startFor);
async.addListener(__listener);
async.addListener(_listener);
historyAdd("start");
if ("1".equals(request.getParameter("throw")))
@ -910,7 +902,7 @@ public class AsyncServletTest
if (start2For >= 0 && request.getAttribute("2nd") == null)
{
final AsyncContext async = wrap ? request.startAsync(new HttpServletRequestWrapper(request), new HttpServletResponseWrapper(response)) : request.startAsync();
async.addListener(__listener);
async.addListener(_listener);
request.setAttribute("2nd", "cycle");
if (start2For > 0)
@ -991,7 +983,7 @@ public class AsyncServletTest
}
}
private static AsyncListener __listener = new AsyncListener()
private final AsyncListener _listener = new AsyncListener()
{
@Override
public void onTimeout(AsyncEvent event) throws IOException
@ -1004,23 +996,19 @@ public class AsyncServletTest
switch (action)
{
case "dispatch":
event.getAsyncContext().dispatch();
break;
case "complete":
case "dispatch" -> event.getAsyncContext().dispatch();
case "complete" ->
{
event.getSuppliedResponse().getOutputStream().println("COMPLETED\n");
event.getAsyncContext().complete();
break;
case "error":
throw new RuntimeException("error in onTimeout");
}
case "error" -> throw new RuntimeException("error in onTimeout");
}
}
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
public void onStartAsync(AsyncEvent event)
{
historyAdd("onStartAsync");
}
@ -1036,20 +1024,18 @@ public class AsyncServletTest
switch (action)
{
case "dispatch":
event.getAsyncContext().dispatch();
break;
case "complete":
case "dispatch" -> event.getAsyncContext().dispatch();
case "complete" ->
{
event.getSuppliedResponse().getOutputStream().println("COMPLETED\n");
event.getAsyncContext().complete();
break;
}
}
}
}
@Override
public void onComplete(AsyncEvent event) throws IOException
public void onComplete(AsyncEvent event)
{
historyAdd("onComplete");
}

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.MultiPartRequestContent;
import org.eclipse.jetty.client.util.PathRequestContent;
@ -47,6 +48,7 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.MultiPart;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@ -57,6 +59,7 @@ import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.resource.FileSystemPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
@ -183,9 +186,12 @@ public class HugeResourceTest
public void startServer() throws Exception
{
assertThat(FileSystemPool.INSTANCE.mounts(), empty());
server = new Server();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
HttpConfiguration httpConfig = new HttpConfiguration();
ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
ServerConnector connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(httpConfig));
connector.setPort(0);
server.addConnector(connector);
@ -222,7 +228,12 @@ public class HugeResourceTest
@BeforeEach
public void startClient() throws Exception
{
client = new HttpClient();
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
ClientConnector connector = new ClientConnector();
connector.setSelectors(1);
connector.setExecutor(clientThreads);
client = new HttpClient(new HttpClientTransportOverHTTP(connector));
client.start();
}
@ -239,9 +250,9 @@ public class HugeResourceTest
URI destUri = server.getURI().resolve("/" + filename);
InputStreamResponseListener responseListener = new InputStreamResponseListener();
Request request = client.newRequest(destUri)
.method(HttpMethod.GET);
request.send(responseListener);
client.newRequest(destUri)
.method(HttpMethod.GET)
.send(responseListener);
Response response = responseListener.get(5, TimeUnit.SECONDS);
assertThat("HTTP Response Code", response.getStatus(), is(200));
@ -292,9 +303,9 @@ public class HugeResourceTest
URI destUri = server.getURI().resolve("/" + filename);
InputStreamResponseListener responseListener = new InputStreamResponseListener();
Request request = client.newRequest(destUri)
.method(HttpMethod.HEAD);
request.send(responseListener);
client.newRequest(destUri)
.method(HttpMethod.HEAD)
.send(responseListener);
Response response = responseListener.get(5, TimeUnit.SECONDS);
try (InputStream in = responseListener.getInputStream())

View File

@ -1,5 +1,4 @@
# Jetty Logging using jetty-slf4j-impl
# org.eclipse.jetty.LEVEL=DEBUG
# org.eclipse.jetty.webapp.LEVEL=DEBUG
# org.eclipse.jetty.util.LEVEL=DEBUG
# org.eclipse.jetty.util.PathWatcher.Noisy.LEVEL=OFF
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.webapp.LEVEL=DEBUG
#org.eclipse.jetty.util.LEVEL=DEBUG
#org.eclipse.jetty.util.PathWatcher.Noisy.LEVEL=OFF

View File

@ -25,13 +25,14 @@ public interface JettyWebSocketCreator
/**
* <p>Creates a websocket from the incoming request.</p>
*
* <p>If no websocket is to be created (return value of null), the {@link JettyWebSocketCreator}
* <p>If no websocket is to be created (return value of null), this {@code JettyWebSocketCreator}
* is responsible for sending a response with {@link JettyServerUpgradeResponse#sendError(int, String)},
* {@link JettyServerUpgradeResponse#sendForbidden(String)} or {@link JettyServerUpgradeResponse#setStatusCode(int)}.</p>
*
* @param req the request details
* @param resp the response details
* @return a websocket object to use, or null if no websocket should be created from this request.
* @throws Exception if the WebSocket creation throws
*/
Object createWebSocket(JettyServerUpgradeRequest req, JettyServerUpgradeResponse resp);
Object createWebSocket(JettyServerUpgradeRequest req, JettyServerUpgradeResponse resp) throws Exception;
}

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.ee10.websocket.server.internal.DelegatedServerUpgradeRe
import org.eclipse.jetty.ee10.websocket.server.internal.DelegatedServerUpgradeResponse;
import org.eclipse.jetty.ee10.websocket.server.internal.JettyServerFrameHandlerFactory;
import org.eclipse.jetty.ee10.websocket.servlet.WebSocketUpgradeFilter;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
@ -293,17 +294,27 @@ public abstract class JettyWebSocketServlet extends HttpServlet
}
@Override
public Object createWebSocket(ServerUpgradeRequest request, ServerUpgradeResponse response, Callback callback)
public Object createWebSocket(ServerUpgradeRequest upgradeRequest, ServerUpgradeResponse upgradeResponse, Callback callback)
{
DelegatedServerUpgradeRequest request = new DelegatedServerUpgradeRequest(upgradeRequest);
DelegatedServerUpgradeResponse response = new DelegatedServerUpgradeResponse(upgradeResponse);
try
{
Object webSocket = creator.createWebSocket(new DelegatedServerUpgradeRequest(request), new DelegatedServerUpgradeResponse(response));
Object webSocket = creator.createWebSocket(request, response);
callback.succeeded();
return webSocket;
}
catch (Throwable t)
{
callback.failed(t);
try
{
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500, "Could not create WebSocket endpoint");
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
return null;
}
}

View File

@ -70,6 +70,11 @@ public class DelegatedServerUpgradeRequest implements JettyServerUpgradeRequest
}
}
public ServerUpgradeRequest getServerUpgradeRequest()
{
return upgradeRequest;
}
@Override
public List<HttpCookie> getCookies()
{

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.ee10.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.ee10.websocket.server.internal.DelegatedServerUpgradeRequest;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.transport.ClientConnectionFactoryOverHTTP2;
@ -65,7 +66,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@ -79,7 +79,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled
public class WebSocketOverHTTP2Test
{
private Server server;
@ -385,16 +384,19 @@ public class WebSocketOverHTTP2Test
protected void configure(JettyWebSocketServletFactory factory)
{
factory.addMapping("/ws/echo", (request, response) -> new EchoSocket());
factory.addMapping("/ws/null", (request, response) -> null);
factory.addMapping("/ws/null", (request, response) ->
{
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503, "null");
return null;
});
factory.addMapping("/ws/throw", (request, response) ->
{
throw new RuntimeException("throwing from creator");
});
factory.addMapping("/ws/connectionClose", (request, response) ->
{
DelegatedServerUpgradeRequest upgradeRequest = (DelegatedServerUpgradeRequest)request.getHttpServletRequest();
Request baseRequest = (Request)upgradeRequest.getHttpServletRequest();
baseRequest.getConnectionMetaData().getConnection().getEndPoint().close();
Request coreRequest = ((DelegatedServerUpgradeRequest)request).getServerUpgradeRequest();
coreRequest.getConnectionMetaData().getConnection().getEndPoint().close();
return new EchoSocket();
});
}

View File

@ -1,4 +1,3 @@
# Jetty Logging using jetty-slf4j-impl
# org.eclipse.jetty.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.test.LEVEL=DEBUG

View File

@ -74,7 +74,6 @@ import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKIN
*/
public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
public static Listener NOOP_LISTENER = new Listener() {};
private static final Logger LOG = LoggerFactory.getLogger(HttpChannel.class);
private final ContextHandler _contextHandler;
@ -114,10 +113,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_response = new Response(this, newHttpOutput());
_executor = _connector.getServer().getThreadPool();
// TODO get real listeners from somewhere
_combinedListener = /* (connector instanceof AbstractConnector)
? ((AbstractConnector)connector).getHttpChannelListeners()
: */ NOOP_LISTENER;
_combinedListener = new HttpChannelListeners(_connector.getBeans(Listener.class));
if (LOG.isDebugEnabled())
LOG.debug("new {} -> {},{},{}",

View File

@ -25,13 +25,14 @@ public interface JettyWebSocketCreator
/**
* Create a websocket from the incoming request.
*
* <p>If no websocket is to be created (return value of null), the {@link JettyWebSocketCreator}
* <p>If no websocket is to be created (return value of null), the {@code JettyWebSocketCreator}
* is responsible for sending a response with {@link JettyServerUpgradeResponse#sendError(int, String)},
* {@link JettyServerUpgradeResponse#sendForbidden(String)} or {@link JettyServerUpgradeResponse#setStatusCode(int)}.</p>
*
* @param req the request details
* @param resp the response details
* @return a websocket object to use, or null if no websocket should be created from this request.
* @throws Exception if the WebSocket creation throws
*/
Object createWebSocket(JettyServerUpgradeRequest req, JettyServerUpgradeResponse resp);
Object createWebSocket(JettyServerUpgradeRequest req, JettyServerUpgradeResponse resp) throws Exception;
}

View File

@ -42,7 +42,7 @@ import org.eclipse.jetty.ee9.websocket.server.JettyWebSocketServerContainer;
import org.eclipse.jetty.ee9.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.ee9.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.ee9.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.ee9.websocket.server.internal.DelegatedServerUpgradeRequest;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.transport.ClientConnectionFactoryOverHTTP2;
@ -55,17 +55,15 @@ import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.internal.HttpChannelState;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@ -79,7 +77,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled
public class WebSocketOverHTTP2Test
{
private Server server;
@ -105,7 +102,7 @@ public class WebSocketOverHTTP2Test
server.addConnector(connector);
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePath(MavenTestingUtils.getTargetPath("test-classes/keystore.p12").toString());
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
@ -314,7 +311,7 @@ public class WebSocketOverHTTP2Test
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/throw");
ExecutionException failure;
try (StacklessLogging ignored = new StacklessLogging(HttpChannelState.class))
try (StacklessLogging ignored = new StacklessLogging(HttpChannel.class))
{
failure = Assertions.assertThrows(ExecutionException.class, () ->
wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS));
@ -385,16 +382,19 @@ public class WebSocketOverHTTP2Test
protected void configure(JettyWebSocketServletFactory factory)
{
factory.addMapping("/ws/echo", (request, response) -> new EchoSocket());
factory.addMapping("/ws/null", (request, response) -> null);
factory.addMapping("/ws/null", (request, response) ->
{
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503, "null");
return null;
});
factory.addMapping("/ws/throw", (request, response) ->
{
throw new RuntimeException("throwing from creator");
});
factory.addMapping("/ws/connectionClose", (request, response) ->
{
DelegatedServerUpgradeRequest upgradeRequest = (DelegatedServerUpgradeRequest)request.getHttpServletRequest();
Request baseRequest = (Request)upgradeRequest.getHttpServletRequest();
baseRequest.getConnectionMetaData().getConnection().getEndPoint().close();
HttpServletRequest upgradeRequest = request.getHttpServletRequest();
((org.eclipse.jetty.ee9.nested.Request)upgradeRequest).getHttpChannel().getEndPoint().close();
return new EchoSocket();
});
}

10
pom.xml
View File

@ -1246,11 +1246,6 @@
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-nosql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jmx</artifactId>
@ -1266,6 +1261,11 @@
<artifactId>jetty-keystore</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-nosql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>