Merged branch 'jetty-9.4.x' into 'master'.

This commit is contained in:
Simone Bordet 2017-12-11 11:50:49 +01:00
commit a116fd05a2
7 changed files with 186 additions and 45 deletions

View File

@ -46,6 +46,8 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Flusher;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
@ -54,6 +56,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
@ -631,6 +634,55 @@ public class StreamResetTest extends AbstractTest
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testResetAfterBlockingWrite() throws Exception
{
int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
CountDownLatch writeLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
ServletOutputStream output = response.getOutputStream();
output.write(new byte[10 * windowSize]);
}
catch (IOException e)
{
writeLatch.countDown();
}
}
});
AtomicLong received = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (received.addAndGet(frame.getData().remaining()) == windowSize)
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Reset.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
HTTP2Session session = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class).getBean(HTTP2Session.class);
HTTP2Flusher flusher = session.getBean(HTTP2Flusher.class);
Assert.assertEquals(0, flusher.getFrameQueueSize());
}
@Test @Test
public void testResetAfterAsyncRequestAsyncWriteStalledByFlowControl() throws Exception public void testResetAfterAsyncRequestAsyncWriteStalledByFlowControl() throws Exception
{ {

View File

@ -74,6 +74,7 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
@Override @Override
public void onStreamDestroyed(IStream stream) public void onStreamDestroyed(IStream stream)
{ {
streamsStalls.remove(stream);
} }
@Override @Override

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
@ -31,10 +32,12 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
public class HTTP2Flusher extends IteratingCallback public class HTTP2Flusher extends IteratingCallback implements Dumpable
{ {
private static final Logger LOG = Log.getLogger(HTTP2Flusher.class); private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
@ -105,7 +108,15 @@ public class HTTP2Flusher extends IteratingCallback
return false; return false;
} }
public int getQueueSize() private int getWindowQueueSize()
{
synchronized (this)
{
return windows.size();
}
}
public int getFrameQueueSize()
{ {
synchronized (this) synchronized (this)
{ {
@ -130,8 +141,6 @@ public class HTTP2Flusher extends IteratingCallback
entry.perform(); entry.perform();
} }
if (!frames.isEmpty())
{
for (Entry entry : frames) for (Entry entry : frames)
{ {
entries.offer(entry); entries.offer(entry);
@ -139,7 +148,6 @@ public class HTTP2Flusher extends IteratingCallback
} }
frames.clear(); frames.clear();
} }
}
if (entries.isEmpty()) if (entries.isEmpty())
@ -155,11 +163,11 @@ public class HTTP2Flusher extends IteratingCallback
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Processing {}", entry); LOG.debug("Processing {}", entry);
// If the stream has been reset, don't send the frame. // If the stream has been reset or removed, don't send the frame.
if (entry.reset()) if (entry.isStale())
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Resetting {}", entry); LOG.debug("Stale {}", entry);
continue; continue;
} }
@ -291,11 +299,32 @@ public class HTTP2Flusher extends IteratingCallback
entry.failed(failure); entry.failed(failure);
} }
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(toString()).append(System.lineSeparator());
}
@Override
public String toString()
{
return String.format("%s[window_queue=%d,frame_queue=%d,actives=%d]",
super.toString(),
getWindowQueueSize(),
getFrameQueueSize(),
actives.size());
}
public static abstract class Entry extends Callback.Nested public static abstract class Entry extends Callback.Nested
{ {
protected final Frame frame; protected final Frame frame;
protected final IStream stream; protected final IStream stream;
private boolean reset;
protected Entry(Frame frame, IStream stream, Callback callback) protected Entry(Frame frame, IStream stream, Callback callback)
{ {
@ -313,7 +342,7 @@ public class HTTP2Flusher extends IteratingCallback
private void complete() private void complete()
{ {
if (reset) if (isStale())
failed(new EofException("reset")); failed(new EofException("reset"));
else else
succeeded(); succeeded();
@ -330,23 +359,31 @@ public class HTTP2Flusher extends IteratingCallback
super.failed(x); super.failed(x);
} }
private boolean reset() private boolean isStale()
{ {
return this.reset = stream != null && stream.isReset() && !isProtocol(); return !isProtocol() && stream != null && stream.isReset();
} }
private boolean isProtocol() private boolean isProtocol()
{ {
switch (frame.getType()) switch (frame.getType())
{ {
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
return false;
case PRIORITY: case PRIORITY:
case RST_STREAM: case RST_STREAM:
case SETTINGS:
case PING:
case GO_AWAY: case GO_AWAY:
case WINDOW_UPDATE: case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT: case DISCONNECT:
return true; return true;
default: default:
return false; throw new IllegalStateException();
} }
} }

View File

@ -59,6 +59,7 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -106,13 +107,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.pushEnabled = true; // SPEC: by default, push is enabled. this.pushEnabled = true; // SPEC: by default, push is enabled.
this.idleTime = System.nanoTime(); this.idleTime = System.nanoTime();
}
@Override
protected void doStart() throws Exception
{
addBean(flowControl); addBean(flowControl);
super.doStart(); addBean(flusher);
} }
@Override @Override
@ -286,7 +282,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream stream = getStream(frame.getStreamId()); IStream stream = getStream(frame.getStreamId());
if (stream != null) if (stream != null)
stream.process(frame, Callback.NOOP); stream.process(frame, new ResetCallback());
else else
notifyReset(this, frame); notifyReset(this, frame);
} }
@ -547,7 +543,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
flusher.iterate(); flusher.iterate();
} }
@Override @Override
public void settings(SettingsFrame frame, Callback callback) public void settings(SettingsFrame frame, Callback callback)
{ {
@ -758,8 +753,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream removed = streams.remove(stream.getId()); IStream removed = streams.remove(stream.getId());
if (removed != null) if (removed != null)
{ {
assert removed == stream;
boolean local = stream.isLocal(); boolean local = stream.isLocal();
if (local) if (local)
localStreamCount.decrementAndGet(); localStreamCount.decrementAndGet();
@ -1115,15 +1108,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out, indent);
dump(out, indent, Collections.singleton(new DumpableCollection("streams", streams.values())));
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{l:%s <-> r:%s,queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d,%s}", return String.format("%s@%x{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
getEndPoint().getLocalAddress(), getEndPoint().getLocalAddress(),
getEndPoint().getRemoteAddress(), getEndPoint().getRemoteAddress(),
flusher.getQueueSize(),
sendWindow, sendWindow,
recvWindow, recvWindow,
streams.size(), streams.size(),
@ -1328,6 +1327,32 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
private class ResetCallback implements Callback
{
@Override
public void succeeded()
{
complete();
}
@Override
public void failed(Throwable x)
{
complete();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
private void complete()
{
flusher.iterate();
}
}
private class CloseCallback implements Callback private class CloseCallback implements Callback
{ {
private final int error; private final int error;

View File

@ -37,11 +37,13 @@ import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Stream extends IdleTimeout implements IStream, Callback public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpable
{ {
private static final Logger LOG = Log.getLogger(HTTP2Stream.class); private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
@ -86,8 +88,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
@Override @Override
public void headers(HeadersFrame frame, Callback callback) public void headers(HeadersFrame frame, Callback callback)
{ {
if (!startWrite(callback)) if (startWrite(callback))
return;
session.frames(this, this, frame, Frame.EMPTY_ARRAY); session.frames(this, this, frame, Frame.EMPTY_ARRAY);
} }
@ -100,8 +101,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
@Override @Override
public void data(DataFrame frame, Callback callback) public void data(DataFrame frame, Callback callback)
{ {
if (!startWrite(callback)) if (startWrite(callback))
return;
session.data(this, this, frame); session.data(this, this, frame);
} }
@ -290,8 +290,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
remoteReset = true; remoteReset = true;
close(); close();
session.removeStream(this); session.removeStream(this);
callback.succeeded(); notifyReset(this, frame, callback);
notifyReset(this, frame);
} }
private void onPush(PushPromiseFrame frame, Callback callback) private void onPush(PushPromiseFrame frame, Callback callback)
@ -375,7 +374,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
@Override @Override
public void close() public void close()
{ {
closeState.set(CloseState.CLOSED); if (closeState.getAndSet(CloseState.CLOSED) != CloseState.CLOSED)
onClose(); onClose();
} }
@ -415,14 +414,14 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
} }
} }
private void notifyReset(Stream stream, ResetFrame frame) private void notifyReset(Stream stream, ResetFrame frame, Callback callback)
{ {
final Listener listener = this.listener; final Listener listener = this.listener;
if (listener == null) if (listener == null)
return; return;
try try
{ {
listener.onReset(stream, frame); listener.onReset(stream, frame, callback);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -446,6 +445,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
} }
} }
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(toString()).append(System.lineSeparator());
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -163,6 +163,19 @@ public interface Stream
*/ */
public void onData(Stream stream, DataFrame frame, Callback callback); public void onData(Stream stream, DataFrame frame, Callback callback);
public default void onReset(Stream stream, ResetFrame frame, Callback callback)
{
try
{
onReset(stream, frame);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
/** /**
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p> * <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
* *
@ -170,7 +183,9 @@ public interface Stream
* @param frame the RST_FRAME received * @param frame the RST_FRAME received
* @see Session.Listener#onReset(Session, ResetFrame) * @see Session.Listener#onReset(Session, ResetFrame)
*/ */
public void onReset(Stream stream, ResetFrame frame); public default void onReset(Stream stream, ResetFrame frame)
{
}
/** /**
* <p>Callback method invoked when the stream exceeds its idle timeout.</p> * <p>Callback method invoked when the stream exceeds its idle timeout.</p>

View File

@ -163,12 +163,12 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
} }
@Override @Override
public void onReset(Stream stream, ResetFrame frame) public void onReset(Stream stream, ResetFrame frame, Callback callback)
{ {
ErrorCode error = ErrorCode.from(frame.getError()); ErrorCode error = ErrorCode.from(frame.getError());
if (error == null) if (error == null)
error = ErrorCode.CANCEL_STREAM_ERROR; error = ErrorCode.CANCEL_STREAM_ERROR;
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP); getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), callback);
} }
@Override @Override