Fixed tests and code after merge of #5310 from 9.4.x.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-11-27 13:53:06 +01:00
parent 963ea59e75
commit 8c465ac76d
3 changed files with 25 additions and 46 deletions

View File

@ -611,20 +611,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
* allocated stream id, or null if not interested in the modified headers frame * allocated stream id, or null if not interested in the modified headers frame
* @param listener the listener that gets notified of stream events * @param listener the listener that gets notified of stream events
*/ */
public void newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Promise<Stream> promise) public Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
{ {
streamsState.newUpgradeStream(frame, listener, promise); return streamsState.newUpgradeStream(frame, listener, failFn);
/*
// TODO: cannot do this, we need to call StreamsState.
HeadersFrame frame = frame;
int streamId = frame.getStreamId();
if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2);
frame = frame.withStreamId(streamId);
}
return createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
*/
} }
protected IStream newStream(int streamId, MetaData.Request request, boolean local) protected IStream newStream(int streamId, MetaData.Request request, boolean local)
@ -790,7 +779,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
protected IStream createLocalStream(int streamId, MetaData.Request request, Promise<Stream> promise) protected IStream createLocalStream(int streamId, MetaData.Request request, Consumer<Throwable> failFn)
{ {
while (true) while (true)
{ {
@ -798,7 +787,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int maxCount = getMaxLocalStreams(); int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount) if (maxCount >= 0 && localCount >= maxCount)
{ {
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); failFn.accept(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
return null; return null;
} }
if (localStreamCount.compareAndSet(localCount, localCount + 1)) if (localStreamCount.compareAndSet(localCount, localCount + 1))
@ -817,7 +806,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
else else
{ {
localStreamCount.decrementAndGet(); localStreamCount.decrementAndGet();
promise.failed(new IllegalStateException("Duplicate stream " + streamId)); failFn.accept(new IllegalStateException("Duplicate stream " + streamId));
return null; return null;
} }
} }
@ -2110,7 +2099,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
private void newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Promise<Stream> promise) private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
{ {
int streamId; int streamId;
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
@ -2118,20 +2107,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
streamId = localStreamIds.getAndAdd(2); streamId = localStreamIds.getAndAdd(2);
HTTP2Session.this.onStreamCreated(streamId); HTTP2Session.this.onStreamCreated(streamId);
} }
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), new Promise<>() IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), x ->
{ {
@Override HTTP2Session.this.onStreamDestroyed(streamId);
public void failed(Throwable x) failFn.accept(x);
{
HTTP2Session.this.onStreamDestroyed(streamId);
promise.failed(x);
}
}); });
if (stream != null) if (stream != null)
{ {
stream.setListener(listener); stream.setListener(listener);
stream.updateClose(frame.isEndStream(), CloseState.Event.AFTER_SEND); stream.updateClose(frame.isEndStream(), CloseState.Event.AFTER_SEND);
} }
return stream;
} }
private boolean newRemoteStream(int streamId) private boolean newRemoteStream(int streamId)
@ -2180,7 +2166,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
MetaData.Request request = extractMetaDataRequest(frames.get(0)); MetaData.Request request = extractMetaDataRequest(frames.get(0));
if (request == null) if (request == null)
return false; return false;
IStream stream = HTTP2Session.this.createLocalStream(streamId, request, promise); IStream stream = HTTP2Session.this.createLocalStream(streamId, request, promise::failed);
if (stream == null) if (stream == null)
return false; return false;

View File

@ -46,7 +46,6 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper; import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -115,27 +114,21 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders()); MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders());
// We do not support upgrade requests with content, so endStream=true. // We do not support upgrade requests with content, so endStream=true.
HeadersFrame frame = new HeadersFrame(metaData, null, true); HeadersFrame frame = new HeadersFrame(metaData, null, true);
((HTTP2Session)session).newUpgradeStream(frame, http2Channel.getStreamListener(), new Promise<>() Stream stream = ((HTTP2Session)session).newUpgradeStream(frame, http2Channel.getStreamListener(), failure ->
{ {
@Override newExchange.requestComplete(failure);
public void succeeded(Stream stream) newExchange.terminateRequest();
{ if (LOG.isDebugEnabled())
http2Channel.setStream(stream); LOG.debug("Upgrade failed for {}", HttpConnectionOverHTTP2.this);
newExchange.requestComplete(null);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade succeeded for {}", HttpConnectionOverHTTP2.this);
}
@Override
public void failed(Throwable failure)
{
newExchange.requestComplete(failure);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade failed for {}", HttpConnectionOverHTTP2.this);
}
}); });
if (stream != null)
{
http2Channel.setStream(stream);
newExchange.requestComplete(null);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade succeeded for {}", HttpConnectionOverHTTP2.this);
}
} }
@Override @Override

View File

@ -718,7 +718,7 @@ public class HttpClientTransportDynamicTest
@Override @Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{ {
jettyRequest.getHttpChannel().getEndPoint().getConnection().close(); jettyRequest.getHttpChannel().getEndPoint().close();
} }
}); });
ClientConnector clientConnector = new ClientConnector(); ClientConnector clientConnector = new ClientConnector();