Implemented gentler shutdown in case of reading -1.

This allows big responses to be sent even if the request half closes
the connection.
This commit is contained in:
Simone Bordet 2014-06-27 17:06:07 +02:00
parent 58b1ec9935
commit 603985dcd2
5 changed files with 90 additions and 29 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
@ -50,7 +51,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Client extends ContainerLifeCycle
{
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final Queue<ISession> sessions = new ConcurrentLinkedQueue<>();
private final SelectorManager selector;
private final ByteBufferPool byteBufferPool;
private long idleTimeout;
@ -96,7 +97,7 @@ public class HTTP2Client extends ContainerLifeCycle
private void closeConnections()
{
for (Session session : sessions)
for (ISession session : sessions)
session.close(ErrorCode.NO_ERROR, null, Callback.Adapter.INSTANCE);
sessions.clear();
}
@ -131,20 +132,18 @@ public class HTTP2Client extends ContainerLifeCycle
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2Session session = new HTTP2ClientSession(getScheduler(), endpoint, generator, context.listener, new HTTP2FlowControl(65535));
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
return new HTTP2ClientConnection(byteBufferPool, getExecutor(), endpoint, parser, 8192, context.promise, session);
return new HTTP2ClientConnection(byteBufferPool, getExecutor(), endpoint, parser, session, 8192, context.promise);
}
}
private class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final Promise<Session> promise;
private final Session session;
public HTTP2ClientConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, int bufferSize, Promise<Session> promise, Session session)
public HTTP2ClientConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise)
{
super(byteBufferPool, executor, endpoint, parser, bufferSize);
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.promise = promise;
this.session = session;
}
@Override
@ -158,7 +157,7 @@ public class HTTP2Client extends ContainerLifeCycle
public void onClose()
{
super.onClose();
sessions.remove(session);
sessions.remove(getSession());
}
@Override
@ -166,15 +165,15 @@ public class HTTP2Client extends ContainerLifeCycle
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
session.close(ErrorCode.NO_ERROR, "idle_timeout", closeCallback);
getSession().close(ErrorCode.NO_ERROR, "idle_timeout", closeCallback);
return false;
}
@Override
public void succeeded()
{
sessions.offer(session);
promise.succeeded(session);
sessions.offer(getSession());
promise.succeeded(getSession());
}
@Override

View File

@ -44,16 +44,23 @@ public class HTTP2Connection extends AbstractConnection
};
private final ByteBufferPool byteBufferPool;
private final Parser parser;
private final ISession session;
private final int bufferSize;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
}
protected ISession getSession()
{
return session;
}
@Override
public void onOpen()
{
@ -85,7 +92,7 @@ public class HTTP2Connection extends AbstractConnection
}
else if (filled < 0)
{
close();
shutdown(endPoint, session);
return -1;
}
else
@ -109,4 +116,10 @@ public class HTTP2Connection extends AbstractConnection
return -1;
}
}
private void shutdown(EndPoint endPoint, ISession session)
{
if (!endPoint.isOutputShutdown())
session.shutdown();
}
}

View File

@ -366,13 +366,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.iterate();
}
public void disconnect()
{
if (LOG.isDebugEnabled())
LOG.debug("Disconnecting");
endPoint.close();
}
protected IStream createLocalStream(HeadersFrame frame)
{
IStream stream = newStream(frame);
@ -472,6 +465,24 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return windowSize.getAndAdd(delta);
}
@Override
public void shutdown()
{
if (LOG.isDebugEnabled())
LOG.debug("Shutting down");
// Append a fake FlusherEntry that disconnects when the queue is drained.
flusher.append(new ShutdownFlusherEntry());
flusher.iterate();
}
public void disconnect()
{
if (LOG.isDebugEnabled())
LOG.debug("Disconnecting");
endPoint.close();
}
private void updateLastStreamId(int streamId)
{
Atomics.updateMax(lastStreamId, streamId);
@ -575,6 +586,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
fail = true;
else
queue.offer(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, queue={}", entry, queue.size());
}
if (fail)
closed(entry);
@ -740,6 +753,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
queued = new ArrayDeque<>(queue);
}
if (LOG.isDebugEnabled())
LOG.debug("Closing, queued={}", queued.size());
while (true)
{
FlusherEntry item = queued.poll();
@ -873,6 +889,39 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
private class ShutdownFlusherEntry extends FlusherEntry
{
public ShutdownFlusherEntry()
{
super(null, null, Adapter.INSTANCE);
}
@Override
public void generate(ByteBufferPool.Lease lease)
{
}
@Override
public void succeeded()
{
flusher.close();
disconnect();
}
@Override
public void failed(Throwable x)
{
flusher.close();
disconnect();
}
@Override
public String toString()
{
return String.format("%s@%x", "ShutdownFrame", hashCode());
}
}
private class PromiseCallback<C> implements Callback
{
private final Promise<C> promise;

View File

@ -34,5 +34,7 @@ public interface ISession extends Session
public int updateWindowSize(int delta);
public void shutdown();
public void disconnect();
}

View File

@ -22,7 +22,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
@ -86,7 +86,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
Parser parser = newServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, parser, getInputBufferSize(), listener, session);
endPoint, parser, session, getInputBufferSize(), listener);
return configure(connection, connector, endPoint);
}
@ -98,20 +98,18 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private class HTTP2ServerConnection extends HTTP2Connection
{
private final ServerSessionListener listener;
private final Session session;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, int inputBufferSize, ServerSessionListener listener, Session session)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, inputBufferSize);
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.session = session;
}
@Override
public void onOpen()
{
super.onOpen();
notifyConnect(session);
notifyConnect(getSession());
}
@Override
@ -119,11 +117,11 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
session.close(ErrorCode.NO_ERROR, "idle_timeout", closeCallback);
getSession().close(ErrorCode.NO_ERROR, "idle_timeout", closeCallback);
return false;
}
private void notifyConnect(Session session)
private void notifyConnect(ISession session)
{
try
{