Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.
This commit is contained in:
commit
c6ad87c3f9
|
@ -19,7 +19,6 @@
|
|||
package org.eclipse.jetty.http2;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
|
@ -46,7 +45,7 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
private final HTTP2Session session;
|
||||
private final ByteBufferPool.Lease lease;
|
||||
private Entry stalled;
|
||||
private boolean terminated;
|
||||
private Throwable terminated;
|
||||
|
||||
public HTTP2Flusher(HTTP2Session session)
|
||||
{
|
||||
|
@ -56,52 +55,54 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
|
||||
public void window(IStream stream, WindowUpdateFrame frame)
|
||||
{
|
||||
boolean closed;
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
if (!closed)
|
||||
if (closed == null)
|
||||
windows.offer(new WindowEntry(stream, frame));
|
||||
}
|
||||
// Flush stalled data.
|
||||
if (!closed)
|
||||
if (closed == null)
|
||||
iterate();
|
||||
}
|
||||
|
||||
public boolean prepend(Entry entry)
|
||||
{
|
||||
boolean closed;
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
if (!closed)
|
||||
if (closed == null)
|
||||
{
|
||||
frames.offerFirst(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Prepended {}, frames={}", entry, frames.size());
|
||||
}
|
||||
}
|
||||
if (closed)
|
||||
closed(entry, new ClosedChannelException());
|
||||
return !closed;
|
||||
if (closed == null)
|
||||
return true;
|
||||
closed(entry, closed);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean append(Entry entry)
|
||||
{
|
||||
boolean closed;
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
if (!closed)
|
||||
if (closed == null)
|
||||
{
|
||||
frames.offer(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Appended {}, frames={}", entry, frames.size());
|
||||
}
|
||||
}
|
||||
if (closed)
|
||||
closed(entry, new ClosedChannelException());
|
||||
return !closed;
|
||||
if (closed == null)
|
||||
return true;
|
||||
closed(entry, closed);
|
||||
return false;
|
||||
}
|
||||
|
||||
public int getQueueSize()
|
||||
|
@ -113,15 +114,15 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
protected Action process() throws Throwable
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Flushing {}", session);
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
if (terminated)
|
||||
throw new ClosedChannelException();
|
||||
if (terminated != null)
|
||||
throw terminated;
|
||||
|
||||
while (!windows.isEmpty())
|
||||
{
|
||||
|
@ -251,13 +252,13 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
{
|
||||
lease.recycle();
|
||||
|
||||
boolean closed;
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
terminated = true;
|
||||
terminated = x;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}, active/queued={}/{}", closed ? "Closing" : "Failing", actives.size(), frames.size());
|
||||
LOG.debug("{}, active/queued={}/{}", closed != null ? "Closing" : "Failing", actives.size(), frames.size());
|
||||
actives.addAll(frames);
|
||||
frames.clear();
|
||||
}
|
||||
|
@ -267,21 +268,21 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
|
||||
// If the failure came from within the
|
||||
// flusher, we need to close the connection.
|
||||
if (!closed)
|
||||
if (closed == null)
|
||||
session.abort(x);
|
||||
}
|
||||
|
||||
void terminate()
|
||||
void terminate(Throwable cause)
|
||||
{
|
||||
boolean closed;
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
terminated = true;
|
||||
terminated = cause;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}", closed ? "Terminated" : "Terminating");
|
||||
LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
|
||||
}
|
||||
if (!closed)
|
||||
if (closed == null)
|
||||
iterate();
|
||||
}
|
||||
|
||||
|
|
|
@ -965,7 +965,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
endPoint.close();
|
||||
}
|
||||
|
||||
private void terminate()
|
||||
private void terminate(Throwable cause)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
|
@ -978,7 +978,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
{
|
||||
if (closed.compareAndSet(current, CloseState.CLOSED))
|
||||
{
|
||||
flusher.terminate();
|
||||
flusher.terminate(cause);
|
||||
for (IStream stream : streams.values())
|
||||
stream.close();
|
||||
streams.clear();
|
||||
|
@ -998,7 +998,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
protected void abort(Throwable failure)
|
||||
{
|
||||
notifyFailure(this, failure);
|
||||
terminate();
|
||||
terminate(failure);
|
||||
}
|
||||
|
||||
public boolean isDisconnected()
|
||||
|
@ -1206,7 +1206,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
case DISCONNECT:
|
||||
{
|
||||
terminate();
|
||||
terminate(new ClosedChannelException());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
|
|
@ -159,15 +159,18 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing {} on {}", frame, stream);
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
Runnable task = channel.onRequestContent(frame, callback);
|
||||
if (task != null)
|
||||
offerTask(task, false);
|
||||
if (channel != null)
|
||||
{
|
||||
Runnable task = channel.onRequestContent(frame, callback);
|
||||
if (task != null)
|
||||
offerTask(task, false);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean onStreamTimeout(IStream stream, Throwable failure)
|
||||
{
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
boolean result = channel.onStreamTimeout(failure);
|
||||
boolean result = channel != null && channel.onStreamTimeout(failure);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
|
||||
return result;
|
||||
|
@ -178,7 +181,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing failure on {}: {}", stream, failure);
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
channel.onFailure(failure);
|
||||
if (channel != null)
|
||||
channel.onFailure(failure);
|
||||
}
|
||||
|
||||
public boolean onSessionTimeout(Throwable failure)
|
||||
|
@ -188,7 +192,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
for (Stream stream : session.getStreams())
|
||||
{
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
result &= !channel.isRequestHandled();
|
||||
if (channel != null)
|
||||
result &= !channel.isRequestHandled();
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
|
||||
|
|
|
@ -217,7 +217,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
// Consume the existing queued data frames to
|
||||
// avoid stalling the session flow control.
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
channel.consumeInput();
|
||||
if (channel != null)
|
||||
channel.consumeInput();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -156,9 +156,9 @@ public abstract class IteratingCallback implements Callback
|
|||
*
|
||||
* @return the appropriate Action
|
||||
*
|
||||
* @throws Exception if the sub task processing throws
|
||||
* @throws Throwable if the sub task processing throws
|
||||
*/
|
||||
protected abstract Action process() throws Exception;
|
||||
protected abstract Action process() throws Throwable;
|
||||
|
||||
/**
|
||||
* Invoked when the overall task has completed successfully.
|
||||
|
|
Loading…
Reference in New Issue