From d0f04a8eae8bbb550b6f89a888392dde0d5d54b5 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Mon, 9 Mar 2020 12:21:06 +0100 Subject: [PATCH] addContent now returns void (Milestone 4) Signed-off-by: Ludovic Orban --- .../http2/server/HttpChannelOverHTTP2.java | 122 +++++++++++++++--- .../http2/server/HttpInputOverHTTP2.java | 30 +---- .../jetty/server/AbstractHttpInput.java | 6 +- .../jetty/server/AbstractLockedHttpInput.java | 4 +- .../org/eclipse/jetty/server/HttpChannel.java | 4 +- .../jetty/server/HttpChannelOverHttp.java | 5 +- .../org/eclipse/jetty/server/HttpInput.java | 3 +- .../jetty/server/HttpInputOverHTTP.java | 3 +- 8 files changed, 115 insertions(+), 62 deletions(-) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index d6586c99358..934c9f81c0b 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -58,6 +58,64 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ private boolean _expect100Continue; private boolean _delayedUntilContent; private boolean _useOutputDirectByteBuffers; + private final RequestContent _requestContent = new RequestContent(); + + private class RequestContent + { + private final ThreadLocal _syncFetchContentTl = new ThreadLocal<>(); + private HttpInput.Content _content; + private boolean _endStream; + private boolean _producing; + + public void demand() + { + if (!_producing) + { + _producing = true; + _syncFetchContentTl.set(Boolean.TRUE); + try + { + getStream().demand(1); + } + finally + { + _syncFetchContentTl.remove(); + } + } + } + + public void offerContent(HttpInput.Content content) + { + if (_content != null) + throw new AssertionError("content cannot be queued"); + _content = content; + _producing = false; + } + + public HttpInput.Content takeContent() + { + HttpInput.Content contentCopy = _content; + _content = null; + return contentCopy; + } + + public void reachedEndOfStream(boolean endStream) + { + _endStream = endStream; + } + + public boolean hasReachedEndOfStream() + { + boolean copy = _endStream; + _endStream = false; + return copy; + } + + private boolean isFetchingContent() + { + return !Boolean.TRUE.equals(_syncFetchContentTl.get()); + } + } public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) { @@ -257,7 +315,21 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ ByteBuffer buffer = frame.getData(); int length = buffer.remaining(); - boolean handle = onContent(new HttpInput.Content(buffer) + + _requestContent.reachedEndOfStream(frame.isEndStream()); + + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", + stream.getId(), + Integer.toHexString(stream.getSession().hashCode()), + length); + } + + boolean wasDelayed = _delayedUntilContent; + _delayedUntilContent = false; + + _requestContent.offerContent(new HttpInput.Content(buffer) { @Override public void succeeded() @@ -277,28 +349,42 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ return callback.getInvocationType(); } }); - - boolean endStream = frame.isEndStream(); - if (endStream) + if (getState().isAsync()) { - boolean handleContent = onContentComplete(); - boolean handleRequest = onRequestComplete(); - handle |= handleContent | handleRequest; + boolean handle = _requestContent.isFetchingContent() && getState().onReadPossible(); + return handle || wasDelayed ? this : null; + } + else + { + getRequest().getHttpInput().unblock(); + return wasDelayed ? this : null; + } + } + + void fetchContent() + { + // HttpInputOverHttp2 calls this method via produceRawContent; + // this is the equivalent of Http1 parseAndFill(). + + HttpInput.Content content = _requestContent.takeContent(); + if (content != null) + { + onContent(content); + } + else + { + _requestContent.demand(); + // If content was produced synchronously, consume it right away. + content = _requestContent.takeContent(); + if (content != null) + onContent(content); } - if (LOG.isDebugEnabled()) + if (_requestContent.hasReachedEndOfStream()) { - LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, handle: {}", - stream.getId(), - Integer.toHexString(stream.getSession().hashCode()), - length, - endStream ? "last" : "some", - handle); + onContentComplete(); + onRequestComplete(); } - - boolean wasDelayed = _delayedUntilContent; - _delayedUntilContent = false; - return handle || wasDelayed ? this : null; } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpInputOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpInputOverHTTP2.java index 1e15b6fb803..0093ef2eda5 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpInputOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpInputOverHTTP2.java @@ -20,46 +20,18 @@ package org.eclipse.jetty.http2.server; import org.eclipse.jetty.server.AbstractLockedHttpInput; import org.eclipse.jetty.server.HttpChannelState; -import org.eclipse.jetty.util.thread.AutoLock; public class HttpInputOverHTTP2 extends AbstractLockedHttpInput { - private boolean _producing; - public HttpInputOverHTTP2(HttpChannelState state) { super(state); } - @Override - public void recycle() - { - try (AutoLock lock = _contentLock.lock()) - { - super.recycle(); - _producing = false; - } - } - - @Override - public boolean addContent(Content content) - { - try (AutoLock lock = _contentLock.lock()) - { - boolean b = super.addContent(content); - _producing = false; - return b; - } - } - @Override protected void produceRawContent() { - if (!_producing) - { - _producing = true; - ((HttpChannelOverHTTP2)_channelState.getHttpChannel()).getStream().demand(1); - } + ((HttpChannelOverHTTP2)_channelState.getHttpChannel()).fetchContent(); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractHttpInput.java index 39b3c33387a..c3a5ad1faba 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractHttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractHttpInput.java @@ -95,7 +95,7 @@ public abstract class AbstractHttpInput extends HttpInput } @Override - public boolean addContent(Content content) + public void addContent(Content content) { if (LOG.isDebugEnabled()) LOG.debug("addContent {} {}", content, _contentProducer); @@ -107,9 +107,7 @@ public abstract class AbstractHttpInput extends HttpInput } _contentProducer.addContent(content); if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; + _channelState.onContentAdded(); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractLockedHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractLockedHttpInput.java index fb11f83687b..b97b61230d7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractLockedHttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractLockedHttpInput.java @@ -78,11 +78,11 @@ public abstract class AbstractLockedHttpInput extends AbstractHttpInput } @Override - public boolean addContent(Content content) + public void addContent(Content content) { try (AutoLock lock = _contentLock.lock()) { - return super.addContent(content); + super.addContent(content); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index e9787547db3..000e103232d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -698,12 +698,12 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor request.getFields()); } - public boolean onContent(HttpInput.Content content) + public void onContent(HttpInput.Content content) { if (LOG.isDebugEnabled()) LOG.debug("onContent {} {}", this, content); _combinedListener.onRequestContent(_request, content.getByteBuffer()); - return _request.getHttpInput().addContent(content); + _request.getHttpInput().addContent(content); } public boolean onContentComplete() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java index 3dfd130306d..1f252fbce95 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java @@ -98,10 +98,9 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque @Override public boolean content(ByteBuffer content) { - HttpInput.Content c = _httpConnection.newContent(content); - boolean handle = onContent(c) || _delayedForContent; + onContent(_httpConnection.newContent(content)); _delayedForContent = false; - return handle; + return true; } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index ac3bd8444ab..28db165796b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -67,9 +67,8 @@ public abstract class HttpInput extends ServletInputStream implements Runnable * Adds some content to this input stream. * * @param content the content to add - * @return true if content channel woken for read */ - public abstract boolean addContent(Content content); + public abstract void addContent(Content content); public abstract boolean hasContent(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 3e1d79cfd12..f7cd9fdca87 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -27,10 +27,9 @@ public class HttpInputOverHTTP extends AbstractHttpInput } @Override - public boolean addContent(Content content) + public void addContent(Content content) { super.addContent(content); - return true; } @Override