Merge branch 'jetty-9.3.x' of github.com:eclipse/jetty.project into jetty-9.3.x

This commit is contained in:
Joakim Erdfelt 2016-10-27 16:17:34 -07:00
commit 2c6c7d86ba
5 changed files with 47 additions and 40 deletions

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.http2;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
@ -47,7 +46,7 @@ public class HTTP2Flusher extends IteratingCallback
private final List<Entry> actives = new ArrayList<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
private boolean terminated;
private Throwable terminated;
public HTTP2Flusher(HTTP2Session session)
{
@ -57,52 +56,54 @@ public class HTTP2Flusher extends IteratingCallback
public void window(IStream stream, WindowUpdateFrame frame)
{
boolean closed;
Throwable closed;
synchronized (this)
{
closed = terminated;
if (!closed)
if (closed == null)
windows.offer(new WindowEntry(stream, frame));
}
// Flush stalled data.
if (!closed)
if (closed == null)
iterate();
}
public boolean prepend(Entry entry)
{
boolean closed;
Throwable closed;
synchronized (this)
{
closed = terminated;
if (!closed)
if (closed == null)
{
frames.add(0, entry);
if (LOG.isDebugEnabled())
LOG.debug("Prepended {}, frames={}", entry, frames.size());
}
}
if (closed)
closed(entry, new ClosedChannelException());
return !closed;
if (closed == null)
return true;
closed(entry, closed);
return false;
}
public boolean append(Entry entry)
{
boolean closed;
Throwable closed;
synchronized (this)
{
closed = terminated;
if (!closed)
if (closed == null)
{
frames.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, frames={}", entry, frames.size());
}
}
if (closed)
closed(entry, new ClosedChannelException());
return !closed;
if (closed == null)
return true;
closed(entry, closed);
return false;
}
private Entry remove(int index)
@ -122,15 +123,15 @@ public class HTTP2Flusher extends IteratingCallback
}
@Override
protected Action process() throws Exception
protected Action process() throws Throwable
{
if (LOG.isDebugEnabled())
LOG.debug("Flushing {}", session);
synchronized (this)
{
if (terminated)
throw new ClosedChannelException();
if (terminated != null)
throw terminated;
// First thing, update the window sizes, so we can
// reason about the frames to remove from the queue.
@ -269,13 +270,13 @@ public class HTTP2Flusher extends IteratingCallback
{
lease.recycle();
boolean closed;
Throwable closed;
synchronized (this)
{
closed = terminated;
terminated = true;
terminated = x;
if (LOG.isDebugEnabled())
LOG.debug("{}, active/queued={}/{}", closed ? "Closing" : "Failing", actives.size(), frames.size());
LOG.debug("{}, active/queued={}/{}", closed != null ? "Closing" : "Failing", actives.size(), frames.size());
actives.addAll(frames);
frames.clear();
}
@ -285,21 +286,21 @@ public class HTTP2Flusher extends IteratingCallback
// If the failure came from within the
// flusher, we need to close the connection.
if (!closed)
if (closed == null)
session.abort(x);
}
void terminate()
void terminate(Throwable cause)
{
boolean closed;
Throwable closed;
synchronized (this)
{
closed = terminated;
terminated = true;
terminated = cause;
if (LOG.isDebugEnabled())
LOG.debug("{}", closed ? "Terminated" : "Terminating");
LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
}
if (!closed)
if (closed == null)
iterate();
}

View File

@ -959,7 +959,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
endPoint.close();
}
private void terminate()
private void terminate(Throwable cause)
{
while (true)
{
@ -972,7 +972,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
if (closed.compareAndSet(current, CloseState.CLOSED))
{
flusher.terminate();
flusher.terminate(cause);
for (IStream stream : streams.values())
stream.close();
streams.clear();
@ -992,7 +992,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected void abort(Throwable failure)
{
notifyFailure(this, failure);
terminate();
terminate(failure);
}
public boolean isDisconnected()
@ -1209,7 +1209,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
case DISCONNECT:
{
terminate();
terminate(new ClosedChannelException());
break;
}
default:

View File

@ -140,15 +140,18 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
Runnable task = channel.onRequestContent(frame, callback);
if (task != null)
offerTask(task, false);
if (channel != null)
{
Runnable task = channel.onRequestContent(frame, callback);
if (task != null)
offerTask(task, false);
}
}
public boolean onStreamTimeout(IStream stream, Throwable failure)
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = channel.onStreamTimeout(failure);
boolean result = channel != null && channel.onStreamTimeout(failure);
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result;
@ -159,7 +162,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.onFailure(failure);
if (channel != null)
channel.onFailure(failure);
}
public boolean onSessionTimeout(Throwable failure)
@ -169,7 +173,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
for (Stream stream : session.getStreams())
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
result &= !channel.isRequestHandled();
if (channel != null)
result &= !channel.isRequestHandled();
}
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);

View File

@ -215,7 +215,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
// Consume the existing queued data frames to
// avoid stalling the session flow control.
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.consumeInput();
if (channel != null)
channel.consumeInput();
}
@Override

View File

@ -156,9 +156,9 @@ public abstract class IteratingCallback implements Callback
*
* @return the appropriate Action
*
* @throws Exception if the sub task processing throws
* @throws Throwable if the sub task processing throws
*/
protected abstract Action process() throws Exception;
protected abstract Action process() throws Throwable;
/**
* Invoked when the overall task has completed successfully.