Rewritten close workflow to make sure that connections are correctly closed.

This commit is contained in:
Simone Bordet 2014-08-20 19:19:44 +02:00
parent 7a61c96ba1
commit 75c1322adc
18 changed files with 767 additions and 234 deletions

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -33,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -75,7 +75,8 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
latch.countDown();
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
latch.countDown();
}
});
@ -91,6 +92,8 @@ public class IdleTimeoutTest extends AbstractTest
}, new Stream.Listener.Adapter());
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Thread.sleep(1000);
}
@Test
@ -113,7 +116,8 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
latch.countDown();
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
latch.countDown();
}
});
@ -133,7 +137,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Test
public void testServerNotEnforcingIdleTimeoutWithPendingStream() throws Exception
public void testServerNotEnforcingIdleTimeoutWithinCallback() throws Exception
{
startServer(new ServerSessionListener.Adapter()
{
@ -143,6 +147,7 @@ public class IdleTimeoutTest extends AbstractTest
try
{
stream.setIdleTimeout(10 * idleTimeout);
// Stay in the callback for more than the idleTimeout.
Thread.sleep(2 * idleTimeout);
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
@ -212,7 +217,8 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
closeLatch.countDown();
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
@ -230,6 +236,7 @@ public class IdleTimeoutTest extends AbstractTest
}, new Stream.Listener.Adapter());
Assert.assertTrue(closeLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(session.isClosed());
}
@Test
@ -248,7 +255,8 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
closeLatch.countDown();
if (session.isClosed() && ((HTTP2Session)session).isDisconnected())
closeLatch.countDown();
}
});
client.setIdleTimeout(idleTimeout);
@ -269,7 +277,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Test
public void testClientNotEnforcingIdleTimeoutWithPendingStream() throws Exception
public void testClientNotEnforcingIdleTimeoutWithinCallback() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
startServer(new ServerSessionListener.Adapter()
@ -311,6 +319,7 @@ public class IdleTimeoutTest extends AbstractTest
{
try
{
// Stay in the callback for more than idleTimeout.
Thread.sleep(2 * idleTimeout);
replyLatch.countDown();
}
@ -379,6 +388,9 @@ public class IdleTimeoutTest extends AbstractTest
Assert.assertFalse(dataLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Stream must be gone.
Assert.assertTrue(session.getStreams().isEmpty());
// Session must not be closed, nor disconnected.
Assert.assertFalse(session.isClosed());
Assert.assertFalse(((HTTP2Session)session).isDisconnected());
}
@Test
@ -420,6 +432,9 @@ public class IdleTimeoutTest extends AbstractTest
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Stream must be gone.
Assert.assertTrue(session.getStreams().isEmpty());
// Session must not be closed, nor disconnected.
Assert.assertFalse(session.isClosed());
Assert.assertFalse(((HTTP2Session)session).isDisconnected());
}
@Test

View File

@ -18,29 +18,7 @@
package org.eclipse.jetty.http2;
public class ResetException extends RuntimeException
public enum CloseState
{
public ResetException()
{
}
public ResetException(String message)
{
super(message);
}
public ResetException(String message, Throwable cause)
{
super(message, cause);
}
public ResetException(Throwable cause)
{
super(cause);
}
public ResetException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
{
super(message, cause, enableSuppression, writableStackTrace);
}
NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
}

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -34,14 +33,6 @@ public class HTTP2Connection extends AbstractConnection
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
protected final Callback closeCallback = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
close();
}
};
private final ByteBufferPool byteBufferPool;
private final Parser parser;
private final ISession session;
@ -92,7 +83,7 @@ public class HTTP2Connection extends AbstractConnection
}
else if (filled < 0)
{
shutdown(endPoint, session);
session.onShutdown();
return -1;
}
else
@ -117,18 +108,12 @@ public class HTTP2Connection extends AbstractConnection
}
}
private void shutdown(EndPoint endPoint, ISession session)
{
if (!endPoint.isOutputShutdown())
session.shutdown();
}
@Override
protected boolean onReadTimeout()
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
getSession().close(ErrorCodes.NO_ERROR, "idle_timeout", closeCallback);
session.onIdleTimeout();
return false;
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
@ -313,8 +314,6 @@ public class HTTP2Flusher extends IteratingCallback
for (Entry entry : queued)
closed(entry, x);
session.disconnect();
}
private void closed(Entry entry, Throwable failure)
@ -347,7 +346,7 @@ public class HTTP2Flusher extends IteratingCallback
public void reset()
{
failed(new ResetException());
failed(new EOFException("reset"));
}
@Override

View File

@ -26,12 +26,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.DisconnectFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
@ -57,14 +58,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
private static final Logger LOG = Log.getLogger(HTTP2Session.class);
private final Callback disconnectOnFailure = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
disconnect();
}
};
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
@ -72,7 +65,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
private final AtomicInteger remoteStreamCount = new AtomicInteger();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
private final Scheduler scheduler;
private final EndPoint endPoint;
private final Generator generator;
@ -144,7 +137,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (getRecvWindow() < 0)
{
close(ErrorCodes.FLOW_CONTROL_ERROR, "session_window_exceeded", disconnectOnFailure);
close(ErrorCodes.FLOW_CONTROL_ERROR, "session_window_exceeded", Callback.Adapter.INSTANCE);
return false;
}
@ -271,7 +264,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: SETTINGS frame MUST be replied.
SettingsFrame reply = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
settings(reply, disconnectOnFailure());
settings(reply, Callback.Adapter.INSTANCE);
return false;
}
@ -287,26 +280,72 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
else
{
PingFrame reply = new PingFrame(frame.getPayload(), true);
control(null, disconnectOnFailure(), reply);
control(null, Callback.Adapter.INSTANCE, reply);
}
return false;
}
/**
* This method is called when receiving a GO_AWAY from the other peer.
* We check the close state to act appropriately:
*
* * NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
* that the content of the queue is written, and then the connection
* closed. We notify the application after being terminated.
* See {@link HTTP2Session.ControlEntry#succeeded()}
*
* * In all other cases, we do nothing since other methods are already
* performing their actions.
*
* @param frame the GO_AWAY frame that has been received.
* @return whether the parsing will be resumed asynchronously
* @see #close(int, String, Callback)
* @see #onShutdown()
* @see #onIdleTimeout()
*/
@Override
public boolean onGoAway(GoAwayFrame frame)
public boolean onGoAway(final GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
while (true)
{
String reason = frame.tryConvertPayload();
if (LOG.isDebugEnabled())
LOG.debug("Received {}: {}/'{}'", frame.getType(), frame.getError(), reason);
CloseState current = closed.get();
switch (current)
{
case NOT_CLOSED:
{
if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
{
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
control(null, new Callback()
{
@Override
public void succeeded()
{
notifyClose(HTTP2Session.this, frame);
}
@Override
public void failed(Throwable x)
{
notifyClose(HTTP2Session.this, frame);
}
}, new DisconnectFrame());
return false;
}
break;
}
default:
{
if (LOG.isDebugEnabled())
LOG.debug("Ignored {}, already closed", frame);
return false;
}
}
}
flusher.close();
notifyClose(this, frame);
return false;
}
@Override
@ -331,7 +370,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void onConnectionFailure(int error, String reason)
{
close(error, reason, disconnectOnFailure());
close(error, reason, Callback.Adapter.INSTANCE);
}
@Override
@ -401,19 +440,66 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
control(getStream(frame.getStreamId()), callback, frame);
}
/**
* Invoked internally and by applications to send a GO_AWAY frame to the
* other peer. We check the close state to act appropriately:
*
* * NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
* GO_AWAY has been written, it will only cause the output to be shut
* down (not the connection closed), so that the application can still
* read frames arriving from the other peer.
* Ideally the other peer will notice the GO_AWAY and close the connection.
* When that happen, we close the connection from {@link #onShutdown()}.
* Otherwise, the idle timeout mechanism will close the connection, see
* {@link #onIdleTimeout()}.
*
* * In all other cases, we do nothing since other methods are already
* performing their actions.
*
* @param error the error code
* @param reason the reason
* @param callback the callback to invoke when the operation is complete
* @see #onGoAway(GoAwayFrame)
* @see #onShutdown()
* @see #onIdleTimeout()
*/
@Override
public void close(int error, String reason, Callback callback)
{
if (closed.compareAndSet(false, true))
while (true)
{
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
if (LOG.isDebugEnabled())
LOG.debug("Sending {}: {}", frame.getType(), reason);
control(null, callback, frame);
CloseState current = closed.get();
switch (current)
{
case NOT_CLOSED:
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
if (LOG.isDebugEnabled())
LOG.debug("Sending {}", frame);
control(null, callback, frame);
return;
}
break;
}
default:
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring close {}/{}, already closed", error, reason);
return;
}
}
}
}
@Override
public boolean isClosed()
{
return closed.get() != CloseState.NOT_CLOSED;
}
private void control(IStream stream, Callback callback, Frame frame)
{
control(stream, callback, frame, Frame.EMPTY_ARRAY);
@ -489,7 +575,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount >= maxCount)
{
reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), disconnectOnFailure());
reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), Callback.Adapter.INSTANCE);
return null;
}
if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
@ -510,7 +596,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
else
{
close(ErrorCodes.PROTOCOL_ERROR, "duplicate_stream", disconnectOnFailure());
close(ErrorCodes.PROTOCOL_ERROR, "duplicate_stream", Callback.Adapter.INSTANCE);
return null;
}
}
@ -594,32 +680,153 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return pushEnabled;
}
/**
* A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
* This method is invoked when the TCP FIN is received, or when an exception is
* thrown while reading, and we check the close state to act appropriately:
*
* * NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
* or there was an exception while reading, and therefore we terminate.
*
* * LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
* it and closed the connection; we queue a disconnect to close the connection
* on the local side.
* The GO_AWAY just shutdown the output, so we need this step to make sure the
* connection is closed. See {@link #close(int, String, Callback)}.
*
* * REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
* do nothing since the handling of the GO_AWAY will take care of closing the
* connection. See {@link #onGoAway(GoAwayFrame)}.
*
* @see #onGoAway(GoAwayFrame)
* @see #close(int, String, Callback)
* @see #onIdleTimeout()
*/
@Override
public void shutdown()
public void onShutdown()
{
if (LOG.isDebugEnabled())
LOG.debug("Shutting down");
flusher.close();
LOG.debug("Shutting down {}", this);
switch (closed.get())
{
case NOT_CLOSED:
{
// The other peer did not send a GO_AWAY, no need to be gentle.
if (LOG.isDebugEnabled())
LOG.debug("Abrupt close for {}", this);
terminate();
break;
}
case LOCALLY_CLOSED:
{
// We have closed locally, and only shutdown
// the output; now queue a disconnect.
control(null, Callback.Adapter.INSTANCE, new DisconnectFrame());
break;
}
case REMOTELY_CLOSED:
{
// Nothing to do, the GO_AWAY frame we
// received will close the connection.
break;
}
default:
{
break;
}
}
}
/**
* This method is invoked when the idle timeout triggers. We check the close state
* to act appropriately:
*
* * NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
* {@link #close(int, String, Callback)}.
*
* * LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
* other peer did not close the connection so we never received the TCP FIN, and
* therefore we terminate.
*
* * REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
* disconnect, but for some reason it was not processed (for example, queue was
* stuck because of TCP congestion), therefore we terminate.
* See {@link #onGoAway(GoAwayFrame)}.
*
* @see #onGoAway(GoAwayFrame)
* @see #close(int, String, Callback)
* @see #onShutdown()
*/
@Override
public void onIdleTimeout()
{
switch (closed.get())
{
case NOT_CLOSED:
{
// Real idle timeout, just close.
close(ErrorCodes.NO_ERROR, "idle_timeout", Callback.Adapter.INSTANCE);
break;
}
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
terminate();
break;
}
default:
{
break;
}
}
}
public void disconnect()
{
if (LOG.isDebugEnabled())
LOG.debug("Disconnecting");
LOG.debug("Disconnecting {}", this);
endPoint.close();
}
private void terminate()
{
while (true)
{
CloseState current = closed.get();
switch (current)
{
case NOT_CLOSED:
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
if (closed.compareAndSet(current, CloseState.CLOSED))
{
// Close the flusher and disconnect.
flusher.close();
disconnect();
return;
}
break;
}
default:
{
return;
}
}
}
}
public boolean isDisconnected()
{
return !endPoint.isOpen();
}
private void updateLastStreamId(int streamId)
{
Atomics.updateMax(lastStreamId, streamId);
}
protected Callback disconnectOnFailure()
{
return disconnectOnFailure;
}
protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
{
try
@ -684,8 +891,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public String toString()
{
return String.format("%s@%x{queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d}", getClass().getSimpleName(),
hashCode(), flusher.getQueueSize(), sendWindow, recvWindow, streams.size());
return String.format("%s@%x{queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d,%s}", getClass().getSimpleName(),
hashCode(), flusher.getQueueSize(), sendWindow, recvWindow, streams.size(), closed);
}
private class ControlEntry extends HTTP2Flusher.Entry
@ -725,7 +932,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
case GO_AWAY:
{
flusher.close();
// We just sent a GO_AWAY, only shutdown the
// output without closing yet, to allow reads.
getEndPoint().shutdownOutput();
break;
}
case DISCONNECT:
{
terminate();
break;
}
default:

View File

@ -41,14 +41,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{
private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
private final Callback disconnectOnFailure = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
session.disconnect();
}
};
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicInteger sendWindow = new AtomicInteger();
@ -147,8 +139,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
// avoid that its idle timeout is rescheduled.
close();
reset(new ResetFrame(getId(), ErrorCodes.CANCEL_STREAM_ERROR), disconnectOnFailure);
// Tell the other peer that we timed out.
reset(new ResetFrame(getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
// Notify the application.
notifyFailure(this, timeout);
}
@ -195,7 +189,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", disconnectOnFailure);
session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", callback);
return false;
}
@ -330,9 +324,4 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return String.format("%s@%x{id=%d,sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
hashCode(), getId(), sendWindow, recvWindow, reset, closeState);
}
private enum CloseState
{
NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
}
}

View File

@ -46,7 +46,19 @@ public interface ISession extends Session
public boolean isPushEnabled();
public void shutdown();
/**
* Callback invoked when the connection reads -1.
*
* @see #onIdleTimeout()
* @see #close(int, String, Callback)
*/
public void onShutdown();
public void disconnect();
/**
* Callback invoked when the idle timeout expires.
*
* @see #onShutdown()
* @see #close(int, String, Callback)
*/
public void onIdleTimeout();
}

View File

@ -39,6 +39,8 @@ public interface Session
public void close(int error, String payload, Callback callback);
public boolean isClosed();
public Collection<Stream> getStreams();
public Stream getStream(int streamId);

View File

@ -0,0 +1,27 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.frames;
public class DisconnectFrame extends Frame
{
public DisconnectFrame()
{
super(FrameType.DISCONNECT);
}
}

View File

@ -34,7 +34,8 @@ public enum FrameType
WINDOW_UPDATE(8),
CONTINUATION(9),
// Synthetic frames only needed by the implementation.
PREFACE(10);
PREFACE(10),
DISCONNECT(11);
public static FrameType from(int type)
{

View File

@ -0,0 +1,35 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.generator;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class DisconnectGenerator extends FrameGenerator
{
public DisconnectGenerator()
{
super(null);
}
@Override
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
}
}

View File

@ -55,6 +55,7 @@ public class Generator
this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator);
this.generators[FrameType.CONTINUATION.getType()] = null; // TODO
this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator();
this.generators[FrameType.DISCONNECT.getType()] = new DisconnectGenerator();
this.dataGenerator = new DataGenerator(headerGenerator);
}

View File

@ -23,7 +23,6 @@ import java.util.Map;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -142,14 +141,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
private void close(Stream stream, String reason)
{
final Session session = stream.getSession();
session.close(ErrorCodes.PROTOCOL_ERROR, reason, new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
((ISession)session).disconnect();
}
});
session.close(ErrorCodes.PROTOCOL_ERROR, reason, Callback.Adapter.INSTANCE);
}
}
}

View File

@ -62,7 +62,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
settings = Collections.emptyMap();
SettingsFrame frame = new SettingsFrame(settings, false);
// TODO: consider sending a WINDOW_UPDATE to enlarge the session send window of the client.
control(null, disconnectOnFailure(), frame, Frame.EMPTY_ARRAY);
control(null, Callback.Adapter.INSTANCE, frame, Frame.EMPTY_ARRAY);
return false;
}

View File

@ -25,12 +25,13 @@ import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.ResetException;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
@ -126,7 +127,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (LOG.isDebugEnabled())
LOG.debug("Could not push " + request, x);
stream.getSession().disconnect();
}
});
}
@ -167,8 +167,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} aborted", stream.getId());
if (!(failure instanceof ResetException))
stream.getSession().disconnect();
if (!stream.isReset())
stream.reset(new ResetFrame(stream.getId(), ErrorCodes.INTERNAL_ERROR), Callback.Adapter.INSTANCE);
}
private class CommitCallback implements Callback
@ -185,7 +185,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
stream.getSession().disconnect();
}
}
}

View File

@ -0,0 +1,117 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
public class AbstractServerTest
{
protected ServerConnector connector;
protected ByteBufferPool byteBufferPool;
protected Generator generator;
protected Server server;
protected String path;
protected void startServer(HttpServlet servlet) throws Exception
{
prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration()));
ServletContextHandler context = new ServletContextHandler(server, "/");
context.addServlet(new ServletHolder(servlet), path);
server.start();
}
protected void startServer(ServerSessionListener listener) throws Exception
{
prepareServer(new RawHTTP2ServerConnectionFactory(listener));
server.start();
}
private void prepareServer(ConnectionFactory connectionFactory)
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
connector = new ServerConnector(server, connectionFactory);
server.addConnector(connector);
path = "/test";
byteBufferPool = new MappedByteBufferPool();
generator = new Generator(byteBufferPool);
}
protected MetaData.Request newRequest(String method, HttpFields fields)
{
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
return new MetaData.Request(method, HttpScheme.HTTP, new HostPortHttpField(authority), path, HttpVersion.HTTP_2, fields);
}
@After
public void dispose() throws Exception
{
server.stop();
}
protected boolean parseResponse(Socket client, Parser parser) throws IOException
{
byte[] buffer = new byte[2048];
InputStream input = client.getInputStream();
client.setSoTimeout(1000);
while (true)
{
try
{
int read = input.read(buffer);
if (read < 0)
return true;
parser.parse(ByteBuffer.wrap(buffer, 0, read));
if (client.isClosed())
return true;
}
catch (SocketTimeoutException x)
{
return false;
}
}
}
}

View File

@ -0,0 +1,252 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
public class CloseTest extends AbstractServerTest
{
@Test
public void testClientAbruptlyClosesConnection() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
final AtomicReference<Session> sessionRef = new AtomicReference<>();
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
try
{
sessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
// Reply with HEADERS.
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
closeLatch.await(5, TimeUnit.SECONDS);
return null;
}
catch (InterruptedException x)
{
return null;
}
}
});
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
{
output.write(BufferUtil.toArray(buffer));
}
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
{
try
{
// Close the connection just after
// receiving the response headers.
client.close();
closeLatch.countDown();
return false;
}
catch (IOException x)
{
return false;
}
}
}, 4096, 8192);
parseResponse(client, parser);
// We need to give some time to the server to receive and process the TCP FIN.
Thread.sleep(1000);
Session session = sessionRef.get();
Assert.assertTrue(session.isClosed());
Assert.assertTrue(((HTTP2Session)session).isDisconnected());
}
}
@Test
public void testClientSendsGoAwayButDoesNotCloseConnectionServerCloses() throws Exception
{
final AtomicReference<Session> sessionRef = new AtomicReference<>();
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
sessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
return null;
}
});
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
generator.control(lease, new GoAwayFrame(1, ErrorCodes.NO_ERROR, "OK".getBytes("UTF-8")));
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
{
output.write(BufferUtil.toArray(buffer));
}
// Don't close the connection; the server should close.
final CountDownLatch responseLatch = new CountDownLatch(1);
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
{
// Even if we sent the GO_AWAY immediately after the
// HEADERS, the server is able to send us the response.
responseLatch.countDown();
return false;
}
}, 4096, 8192);
parseResponse(client, parser);
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
// Wait for the server to close.
Thread.sleep(1000);
// Client received the TCP FIN from server.
Assert.assertEquals(-1, client.getInputStream().read());
// Server is closed.
Session session = sessionRef.get();
Assert.assertTrue(session.isClosed());
Assert.assertTrue(((HTTP2Session)session).isDisconnected());
}
}
@Test
public void testServerSendsGoAwayClientDoesNotCloseServerIdleTimeout() throws Exception
{
final long idleTimeout = 1000;
final AtomicReference<Session> sessionRef = new AtomicReference<>();
startServer(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.setIdleTimeout(10 * idleTimeout);
sessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
stream.getSession().close(ErrorCodes.NO_ERROR, "OK", Callback.Adapter.INSTANCE);
return null;
}
});
connector.setIdleTimeout(idleTimeout);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
{
output.write(BufferUtil.toArray(buffer));
}
final CountDownLatch responseLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public boolean onHeaders(HeadersFrame frame)
{
responseLatch.countDown();
return false;
}
@Override
public boolean onGoAway(GoAwayFrame frame)
{
closeLatch.countDown();
return false;
}
}, 4096, 8192);
parseResponse(client, parser);
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
// Don't close the connection.
// Wait for the server to idle timeout.
Thread.sleep(2 * idleTimeout);
// Client received the TCP FIN from server.
Assert.assertEquals(-1, client.getInputStream().read());
// Server is closed.
Session session = sessionRef.get();
Assert.assertTrue(session.isClosed());
Assert.assertTrue(((HTTP2Session)session).isDisconnected());
}
}
}

View File

@ -19,10 +19,8 @@
package org.eclipse.jetty.http2.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
@ -33,11 +31,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.frames.DataFrame;
@ -46,67 +40,25 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HTTP2ServerTest
public class HTTP2ServerTest extends AbstractServerTest
{
private Server server;
private ServerConnector connector;
private String path;
private ByteBufferPool byteBufferPool;
private Generator generator;
private void startServer(HttpServlet servlet) throws Exception
{
server = new Server();
connector = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration()));
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/");
path = "/test";
context.addServlet(new ServletHolder(servlet), path);
byteBufferPool = new MappedByteBufferPool();
generator = new Generator(byteBufferPool);
server.start();
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testNoPrefaceBytes() throws Exception
{
startServer(new HttpServlet(){});
String host = "localhost";
int port = connector.getLocalPort();
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), HttpScheme.HTTP, new HostPortHttpField(host + ":" + port),
path, HttpVersion.HTTP_2, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
// No preface bytes.
MetaData.Request metaData = newRequest("GET", new HttpFields());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, request);
generator.control(lease, new HeadersFrame(1, metaData, null, true));
// No preface bytes
try (Socket client = new Socket(host, port))
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -144,17 +96,12 @@ public class HTTP2ServerTest
}
});
String host = "localhost";
int port = connector.getLocalPort();
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), HttpScheme.HTTP, new HostPortHttpField(host + ":" + port),
path, HttpVersion.HTTP_2, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
try (Socket client = new Socket(host, port))
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -207,17 +154,12 @@ public class HTTP2ServerTest
}
});
String host = "localhost";
int port = connector.getLocalPort();
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(),HttpScheme.HTTP, new HostPortHttpField(host + ":" + port),
path, HttpVersion.HTTP_2, fields);
HeadersFrame request = new HeadersFrame(1, metaData, null, true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, request);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
try (Socket client = new Socket(host, port))
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -273,17 +215,14 @@ public class HTTP2ServerTest
{
startServer(new HttpServlet(){});
String host = "localhost";
int port = connector.getLocalPort();
PingFrame frame = new PingFrame(new byte[8], false);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, frame);
generator.control(lease, new PrefaceFrame());
generator.control(lease, new PingFrame(new byte[8], false));
// Modify the length of the frame to a wrong one.
lease.getByteBuffers().get(0).putShort(0, (short)7);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
lease.getByteBuffers().get(1).putShort(0, (short)7);
final CountDownLatch latch = new CountDownLatch(1);
try (Socket client = new Socket(host, port))
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -313,17 +252,14 @@ public class HTTP2ServerTest
{
startServer(new HttpServlet(){});
String host = "localhost";
int port = connector.getLocalPort();
PingFrame frame = new PingFrame(new byte[8], false);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, frame);
generator.control(lease, new PrefaceFrame());
generator.control(lease, new PingFrame(new byte[8], false));
// Modify the streamId of the frame to non zero.
lease.getByteBuffers().get(0).putInt(4, 1);
lease.prepend(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
lease.getByteBuffers().get(1).putInt(4, 1);
final CountDownLatch latch = new CountDownLatch(1);
try (Socket client = new Socket(host, port))
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -347,25 +283,4 @@ public class HTTP2ServerTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
private boolean parseResponse(Socket client, Parser parser) throws IOException
{
byte[] buffer = new byte[2048];
InputStream input = client.getInputStream();
client.setSoTimeout(1000);
while (true)
{
try
{
int read = input.read(buffer);
if (read < 0)
return true;
parser.parse(ByteBuffer.wrap(buffer, 0, read));
}
catch (SocketTimeoutException x)
{
return false;
}
}
}
}