addContent now returns void (Milestone 4)

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2020-03-09 12:21:06 +01:00
parent 982ddc0c48
commit d0f04a8eae
8 changed files with 115 additions and 62 deletions

View File

@ -58,6 +58,64 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
private boolean _expect100Continue;
private boolean _delayedUntilContent;
private boolean _useOutputDirectByteBuffers;
private final RequestContent _requestContent = new RequestContent();
private class RequestContent
{
private final ThreadLocal<Boolean> _syncFetchContentTl = new ThreadLocal<>();
private HttpInput.Content _content;
private boolean _endStream;
private boolean _producing;
public void demand()
{
if (!_producing)
{
_producing = true;
_syncFetchContentTl.set(Boolean.TRUE);
try
{
getStream().demand(1);
}
finally
{
_syncFetchContentTl.remove();
}
}
}
public void offerContent(HttpInput.Content content)
{
if (_content != null)
throw new AssertionError("content cannot be queued");
_content = content;
_producing = false;
}
public HttpInput.Content takeContent()
{
HttpInput.Content contentCopy = _content;
_content = null;
return contentCopy;
}
public void reachedEndOfStream(boolean endStream)
{
_endStream = endStream;
}
public boolean hasReachedEndOfStream()
{
boolean copy = _endStream;
_endStream = false;
return copy;
}
private boolean isFetchingContent()
{
return !Boolean.TRUE.equals(_syncFetchContentTl.get());
}
}
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
@ -257,7 +315,21 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
ByteBuffer buffer = frame.getData();
int length = buffer.remaining();
boolean handle = onContent(new HttpInput.Content(buffer)
_requestContent.reachedEndOfStream(frame.isEndStream());
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length);
}
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
_requestContent.offerContent(new HttpInput.Content(buffer)
{
@Override
public void succeeded()
@ -277,28 +349,42 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
return callback.getInvocationType();
}
});
boolean endStream = frame.isEndStream();
if (endStream)
if (getState().isAsync())
{
boolean handleContent = onContentComplete();
boolean handleRequest = onRequestComplete();
handle |= handleContent | handleRequest;
boolean handle = _requestContent.isFetchingContent() && getState().onReadPossible();
return handle || wasDelayed ? this : null;
}
else
{
getRequest().getHttpInput().unblock();
return wasDelayed ? this : null;
}
}
void fetchContent()
{
// HttpInputOverHttp2 calls this method via produceRawContent;
// this is the equivalent of Http1 parseAndFill().
HttpInput.Content content = _requestContent.takeContent();
if (content != null)
{
onContent(content);
}
else
{
_requestContent.demand();
// If content was produced synchronously, consume it right away.
content = _requestContent.takeContent();
if (content != null)
onContent(content);
}
if (LOG.isDebugEnabled())
if (_requestContent.hasReachedEndOfStream())
{
LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, handle: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
handle);
onContentComplete();
onRequestComplete();
}
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
return handle || wasDelayed ? this : null;
}
@Override

View File

@ -20,46 +20,18 @@ package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.server.AbstractLockedHttpInput;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.util.thread.AutoLock;
public class HttpInputOverHTTP2 extends AbstractLockedHttpInput
{
private boolean _producing;
public HttpInputOverHTTP2(HttpChannelState state)
{
super(state);
}
@Override
public void recycle()
{
try (AutoLock lock = _contentLock.lock())
{
super.recycle();
_producing = false;
}
}
@Override
public boolean addContent(Content content)
{
try (AutoLock lock = _contentLock.lock())
{
boolean b = super.addContent(content);
_producing = false;
return b;
}
}
@Override
protected void produceRawContent()
{
if (!_producing)
{
_producing = true;
((HttpChannelOverHTTP2)_channelState.getHttpChannel()).getStream().demand(1);
}
((HttpChannelOverHTTP2)_channelState.getHttpChannel()).fetchContent();
}
@Override

View File

@ -95,7 +95,7 @@ public abstract class AbstractHttpInput extends HttpInput
}
@Override
public boolean addContent(Content content)
public void addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
@ -107,9 +107,7 @@ public abstract class AbstractHttpInput extends HttpInput
}
_contentProducer.addContent(content);
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
_channelState.onContentAdded();
}
@Override

View File

@ -78,11 +78,11 @@ public abstract class AbstractLockedHttpInput extends AbstractHttpInput
}
@Override
public boolean addContent(Content content)
public void addContent(Content content)
{
try (AutoLock lock = _contentLock.lock())
{
return super.addContent(content);
super.addContent(content);
}
}

View File

@ -698,12 +698,12 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
request.getFields());
}
public boolean onContent(HttpInput.Content content)
public void onContent(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("onContent {} {}", this, content);
_combinedListener.onRequestContent(_request, content.getByteBuffer());
return _request.getHttpInput().addContent(content);
_request.getHttpInput().addContent(content);
}
public boolean onContentComplete()

View File

@ -98,10 +98,9 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
@Override
public boolean content(ByteBuffer content)
{
HttpInput.Content c = _httpConnection.newContent(content);
boolean handle = onContent(c) || _delayedForContent;
onContent(_httpConnection.newContent(content));
_delayedForContent = false;
return handle;
return true;
}
@Override

View File

@ -67,9 +67,8 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
* Adds some content to this input stream.
*
* @param content the content to add
* @return true if content channel woken for read
*/
public abstract boolean addContent(Content content);
public abstract void addContent(Content content);
public abstract boolean hasContent();

View File

@ -27,10 +27,9 @@ public class HttpInputOverHTTP extends AbstractHttpInput
}
@Override
public boolean addContent(Content content)
public void addContent(Content content)
{
super.addContent(content);
return true;
}
@Override