394854 optimised promise implementation
This commit is contained in:
parent
04aafcae3c
commit
9edac367c8
|
@ -24,8 +24,10 @@ import java.nio.channels.InterruptedByTimeoutException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
@ -75,7 +77,6 @@ import org.eclipse.jetty.util.ForkInvoker;
|
|||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.PromisingCallback;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -84,7 +85,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
|
||||
public class StandardSession implements ISession, Parser.Listener, Dumpable
|
||||
{
|
||||
private static final Logger logger = Log.getLogger(Session.class);
|
||||
private static final Logger LOG = Log.getLogger(Session.class);
|
||||
|
||||
private final ForkInvoker<Callback> invoker = new SessionInvoker();
|
||||
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
|
||||
|
@ -171,8 +172,8 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
int streamId = streamIds.getAndAdd(2);
|
||||
// TODO: for SPDYv3 we need to support the "slot" argument
|
||||
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
|
||||
IStream stream = createStream(synStream, listener, true);
|
||||
generateAndEnqueueControlFrame(stream, synStream, timeout, unit, new PromisingCallback<Stream>(promise,stream));
|
||||
StandardStream stream = createStream(synStream, listener, true, promise);
|
||||
generateAndEnqueueControlFrame(stream, synStream, timeout, unit, stream);
|
||||
}
|
||||
flush();
|
||||
}
|
||||
|
@ -234,9 +235,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
public void ping(long timeout, TimeUnit unit, Promise<PingInfo> promise)
|
||||
{
|
||||
int pingId = pingIds.getAndAdd(2);
|
||||
PingInfo pingInfo = new PingInfo(pingId);
|
||||
PromisingPingInfoCallback pingInfo = new PromisingPingInfoCallback(pingId,promise);
|
||||
PingFrame frame = new PingFrame(version,pingId);
|
||||
control(null,frame,timeout,unit,new PromisingCallback<PingInfo>(promise,pingInfo));
|
||||
control(null,frame,timeout,unit,pingInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -310,11 +311,11 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
notifyIdle(idleListener, false);
|
||||
try
|
||||
{
|
||||
logger.debug("Processing {}", frame);
|
||||
LOG.debug("Processing {}", frame);
|
||||
|
||||
if (goAwaySent.get())
|
||||
{
|
||||
logger.debug("Skipped processing of {}", frame);
|
||||
LOG.debug("Skipped processing of {}", frame);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -388,11 +389,11 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
notifyIdle(idleListener, false);
|
||||
try
|
||||
{
|
||||
logger.debug("Processing {}, {} data bytes", frame, data.remaining());
|
||||
LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
|
||||
|
||||
if (goAwaySent.get())
|
||||
{
|
||||
logger.debug("Skipped processing of {}", frame);
|
||||
LOG.debug("Skipped processing of {}", frame);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -401,7 +402,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
if (stream == null)
|
||||
{
|
||||
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
|
||||
logger.debug("Unknown stream {}", rstInfo);
|
||||
LOG.debug("Unknown stream {}", rstInfo);
|
||||
rst(rstInfo);
|
||||
}
|
||||
else
|
||||
|
@ -455,7 +456,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
|
||||
private void onSyn(SynStreamFrame frame)
|
||||
{
|
||||
IStream stream = createStream(frame, null, false);
|
||||
IStream stream = createStream(frame, null, false,null);
|
||||
if (stream != null)
|
||||
processSyn(listener, stream, frame);
|
||||
}
|
||||
|
@ -474,9 +475,12 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
removeStream(stream);
|
||||
}
|
||||
|
||||
private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local)
|
||||
private StandardStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local,Promise<Stream> promise)
|
||||
{
|
||||
IStream stream = newStream(frame);
|
||||
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
|
||||
StandardStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,promise);
|
||||
flowControlStrategy.onNewStream(this, stream);
|
||||
|
||||
stream.updateCloseState(frame.isClose(), local);
|
||||
stream.setStreamFrameListener(listener);
|
||||
|
||||
|
@ -494,27 +498,19 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
if (local)
|
||||
throw new IllegalStateException("Duplicate stream id " + streamId);
|
||||
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
|
||||
logger.debug("Duplicate stream, {}", rstInfo);
|
||||
LOG.debug("Duplicate stream, {}", rstInfo);
|
||||
rst(rstInfo);
|
||||
return null;
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.debug("Created {}", stream);
|
||||
LOG.debug("Created {}", stream);
|
||||
if (local)
|
||||
notifyStreamCreated(stream);
|
||||
return stream;
|
||||
}
|
||||
}
|
||||
|
||||
private IStream newStream(SynStreamFrame frame)
|
||||
{
|
||||
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
|
||||
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream);
|
||||
flowControlStrategy.onNewStream(this, stream);
|
||||
return stream;
|
||||
}
|
||||
|
||||
private void notifyStreamCreated(IStream stream)
|
||||
{
|
||||
for (Listener listener : listeners)
|
||||
|
@ -527,11 +523,11 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -547,7 +543,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
if (removed != null)
|
||||
assert removed == stream;
|
||||
|
||||
logger.debug("Removed {}", stream);
|
||||
LOG.debug("Removed {}", stream);
|
||||
notifyStreamClosed(stream);
|
||||
}
|
||||
|
||||
|
@ -563,11 +559,11 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -581,7 +577,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
if (stream == null)
|
||||
{
|
||||
RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
|
||||
logger.debug("Unknown stream {}",rstInfo);
|
||||
LOG.debug("Unknown stream {}",rstInfo);
|
||||
rst(rstInfo);
|
||||
}
|
||||
else
|
||||
|
@ -619,7 +615,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
int windowSize = windowSizeSetting.value();
|
||||
setWindowSize(windowSize);
|
||||
logger.debug("Updated session window size to {}", windowSize);
|
||||
LOG.debug("Updated session window size to {}", windowSize);
|
||||
}
|
||||
SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
|
||||
notifyOnSettings(listener, settingsInfo);
|
||||
|
@ -661,7 +657,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
if (stream == null)
|
||||
{
|
||||
RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
|
||||
logger.debug("Unknown stream, {}",rstInfo);
|
||||
LOG.debug("Unknown stream, {}",rstInfo);
|
||||
rst(rstInfo);
|
||||
}
|
||||
else
|
||||
|
@ -687,7 +683,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
|
||||
private void onCredential(CredentialFrame frame)
|
||||
{
|
||||
logger.warn("{} frame not yet supported", frame.getType());
|
||||
LOG.warn("{} frame not yet supported", frame.getType());
|
||||
flush();
|
||||
}
|
||||
|
||||
|
@ -704,17 +700,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
if (listener != null)
|
||||
{
|
||||
logger.debug("Invoking callback with {} on listener {}",x,listener);
|
||||
LOG.debug("Invoking callback with {} on listener {}",x,listener);
|
||||
listener.onException(x);
|
||||
}
|
||||
}
|
||||
catch (Exception xx)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, xx);
|
||||
LOG.info("Exception while notifying listener " + listener, xx);
|
||||
}
|
||||
catch (Error xx)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, xx);
|
||||
LOG.info("Exception while notifying listener " + listener, xx);
|
||||
throw xx;
|
||||
}
|
||||
}
|
||||
|
@ -725,17 +721,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
if (listener == null)
|
||||
return null;
|
||||
logger.debug("Invoking callback with {} on listener {}",synInfo,listener);
|
||||
LOG.debug("Invoking callback with {} on listener {}",synInfo,listener);
|
||||
return listener.onSyn(stream,synInfo);
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener,x);
|
||||
LOG.info("Exception while notifying listener " + listener,x);
|
||||
return null;
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -746,17 +742,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
if (listener != null)
|
||||
{
|
||||
logger.debug("Invoking callback with {} on listener {}",rstInfo,listener);
|
||||
LOG.debug("Invoking callback with {} on listener {}",rstInfo,listener);
|
||||
listener.onRst(this,rstInfo);
|
||||
}
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -767,17 +763,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
if (listener != null)
|
||||
{
|
||||
logger.debug("Invoking callback with {} on listener {}",settingsInfo,listener);
|
||||
LOG.debug("Invoking callback with {} on listener {}",settingsInfo,listener);
|
||||
listener.onSettings(this, settingsInfo);
|
||||
}
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -788,17 +784,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
if (listener != null)
|
||||
{
|
||||
logger.debug("Invoking callback with {} on listener {}",pingInfo,listener);
|
||||
LOG.debug("Invoking callback with {} on listener {}",pingInfo,listener);
|
||||
listener.onPing(this, pingInfo);
|
||||
}
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -809,17 +805,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
{
|
||||
if (listener != null)
|
||||
{
|
||||
logger.debug("Invoking callback with {} on listener {}",goAwayInfo,listener);
|
||||
LOG.debug("Invoking callback with {} on listener {}",goAwayInfo,listener);
|
||||
listener.onGoAway(this, goAwayInfo);
|
||||
}
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
catch (Error x)
|
||||
{
|
||||
logger.info("Exception while notifying listener " + listener, x);
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -842,7 +838,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
synchronized (this)
|
||||
{
|
||||
ByteBuffer buffer = generator.control(frame);
|
||||
logger.debug("Queuing {} on {}", frame, stream);
|
||||
LOG.debug("Queuing {} on {}", frame, stream);
|
||||
ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
|
||||
if (timeout > 0)
|
||||
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
|
||||
|
@ -870,7 +866,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
@Override
|
||||
public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
|
||||
{
|
||||
logger.debug("Queuing {} on {}",dataInfo,stream);
|
||||
LOG.debug("Queuing {} on {}",dataInfo,stream);
|
||||
DataFrameBytes frameBytes = new DataFrameBytes(stream,callback,dataInfo);
|
||||
if (timeout > 0)
|
||||
frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
|
||||
|
@ -902,9 +898,10 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
return;
|
||||
|
||||
Set<IStream> stalledStreams = null;
|
||||
for (int i = 0; i < queue.size(); ++i)
|
||||
Iterator<FrameBytes> iter = queue.iterator();
|
||||
while(iter.hasNext())
|
||||
{
|
||||
frameBytes = queue.get(i);
|
||||
frameBytes=iter.next();
|
||||
|
||||
IStream stream = frameBytes.getStream();
|
||||
if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
|
||||
|
@ -913,7 +910,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
buffer = frameBytes.getByteBuffer();
|
||||
if (buffer != null)
|
||||
{
|
||||
queue.remove(i);
|
||||
iter.remove();
|
||||
if (stream != null && stream.isReset())
|
||||
{
|
||||
frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM,
|
||||
|
@ -927,17 +924,17 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
stalledStreams = new HashSet<>();
|
||||
if (stream != null)
|
||||
stalledStreams.add(stream);
|
||||
|
||||
logger.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size());
|
||||
|
||||
LOG.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size());
|
||||
}
|
||||
|
||||
if (buffer == null)
|
||||
return;
|
||||
|
||||
flushing = true;
|
||||
logger.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
|
||||
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
|
||||
}
|
||||
write(buffer,new SessionWrite(frameBytes));
|
||||
write(buffer,frameBytes);
|
||||
}
|
||||
|
||||
private void append(FrameBytes frameBytes)
|
||||
|
@ -948,15 +945,18 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
failure = this.failure;
|
||||
if (failure == null)
|
||||
{
|
||||
int index = queue.size();
|
||||
while (index > 0)
|
||||
ListIterator<FrameBytes> iter = queue.listIterator(queue.size());
|
||||
|
||||
while(iter.hasPrevious())
|
||||
{
|
||||
FrameBytes element = queue.get(index - 1);
|
||||
FrameBytes element = iter.previous();
|
||||
if (element.compareTo(frameBytes) >= 0)
|
||||
{
|
||||
iter.next();
|
||||
break;
|
||||
--index;
|
||||
}
|
||||
}
|
||||
queue.add(index,frameBytes);
|
||||
iter.add(frameBytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -972,15 +972,18 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
failure = this.failure;
|
||||
if (failure == null)
|
||||
{
|
||||
int index = 0;
|
||||
while (index < queue.size())
|
||||
ListIterator<FrameBytes> iter = queue.listIterator(0);
|
||||
|
||||
while(iter.hasNext())
|
||||
{
|
||||
FrameBytes element = queue.get(index);
|
||||
FrameBytes element = iter.next();
|
||||
if (element.compareTo(frameBytes) <= 0)
|
||||
{
|
||||
iter.previous();
|
||||
break;
|
||||
++index;
|
||||
}
|
||||
}
|
||||
queue.add(index,frameBytes);
|
||||
iter.add(frameBytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -988,54 +991,11 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
frameBytes.fail(new SPDYException(failure));
|
||||
}
|
||||
|
||||
private class SessionWrite implements Callback
|
||||
{
|
||||
final FrameBytes frameBytes;
|
||||
SessionWrite(FrameBytes frameBytes)
|
||||
{
|
||||
this.frameBytes=frameBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
synchronized (queue)
|
||||
{
|
||||
logger.debug("Completed write of {}, {} frame(s) in queue",frameBytes,queue.size());
|
||||
flushing = false;
|
||||
}
|
||||
frameBytes.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
// TODO because this is using frameBytes here, then it is not really a Promise.
|
||||
// frameBytes is not a result, but is something known before the operation is attempted!
|
||||
|
||||
List<FrameBytes> frameBytesToFail = new ArrayList<>();
|
||||
frameBytesToFail.add(frameBytes);
|
||||
|
||||
synchronized (queue)
|
||||
{
|
||||
failure = x;
|
||||
String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",frameBytes,queue.size());
|
||||
logger.debug(logMessage,x);
|
||||
frameBytesToFail.addAll(queue);
|
||||
queue.clear();
|
||||
flushing = false;
|
||||
}
|
||||
|
||||
for (FrameBytes fb : frameBytesToFail)
|
||||
fb.fail(x);
|
||||
}
|
||||
}
|
||||
|
||||
protected void write(ByteBuffer buffer, Callback callback)
|
||||
{
|
||||
if (controller != null)
|
||||
{
|
||||
logger.debug("Writing {} frame bytes of {}",buffer.remaining());
|
||||
LOG.debug("Writing {} frame bytes of {}",buffer.remaining());
|
||||
controller.write(buffer,callback);
|
||||
}
|
||||
}
|
||||
|
@ -1058,11 +1018,11 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
}
|
||||
catch (Exception xx)
|
||||
{
|
||||
logger.info("Exception while notifying callback " + callback, xx);
|
||||
LOG.info("Exception while notifying callback " + callback, xx);
|
||||
}
|
||||
catch (Error xx)
|
||||
{
|
||||
logger.info("Exception while notifying callback " + callback, xx);
|
||||
LOG.info("Exception while notifying callback " + callback, xx);
|
||||
throw xx;
|
||||
}
|
||||
}
|
||||
|
@ -1121,7 +1081,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
public interface FrameBytes extends Comparable<FrameBytes>
|
||||
public interface FrameBytes extends Comparable<FrameBytes>, Callback
|
||||
{
|
||||
public IStream getStream();
|
||||
|
||||
|
@ -1193,6 +1153,45 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
close();
|
||||
fail(new InterruptedByTimeoutException());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
synchronized (queue)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Completed write of {}, {} frame(s) in queue",this,queue.size());
|
||||
flushing = false;
|
||||
}
|
||||
complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
// TODO because this is using frameBytes here, then it is not really a Promise.
|
||||
// frameBytes is not a result, but is something known before the operation is attempted!
|
||||
|
||||
List<FrameBytes> frameBytesToFail = new ArrayList<>();
|
||||
frameBytesToFail.add(this);
|
||||
|
||||
synchronized (queue)
|
||||
{
|
||||
failure = x;
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",this,queue.size());
|
||||
LOG.debug(logMessage,x);
|
||||
}
|
||||
frameBytesToFail.addAll(queue);
|
||||
queue.clear();
|
||||
flushing = false;
|
||||
}
|
||||
|
||||
for (FrameBytes fb : frameBytesToFail)
|
||||
fb.fail(x);
|
||||
}
|
||||
}
|
||||
|
||||
private class ControlFrameBytes extends AbstractFrameBytes
|
||||
|
@ -1324,4 +1323,30 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
|
|||
close();
|
||||
}
|
||||
}
|
||||
|
||||
private static class PromisingPingInfoCallback extends PingInfo implements Callback
|
||||
{
|
||||
public PromisingPingInfoCallback(int pingId,Promise<PingInfo> promise)
|
||||
{
|
||||
super(pingId);
|
||||
this.promise=promise;
|
||||
}
|
||||
|
||||
private final Promise<PingInfo> promise;
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
if (promise!=null)
|
||||
promise.succeeded(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (promise!=null)
|
||||
promise.failed(x);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,10 +41,11 @@ import org.eclipse.jetty.util.Callback;
|
|||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.PromisingCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class StandardStream implements IStream
|
||||
public class StandardStream extends PromisingCallback<Stream> implements IStream
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(Stream.class);
|
||||
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
|
||||
|
@ -59,8 +60,9 @@ public class StandardStream implements IStream
|
|||
private volatile CloseState closeState = CloseState.OPENED;
|
||||
private volatile boolean reset = false;
|
||||
|
||||
public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
|
||||
public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
|
||||
{
|
||||
super(promise);
|
||||
this.id = id;
|
||||
this.priority = priority;
|
||||
this.session = session;
|
||||
|
|
|
@ -90,10 +90,10 @@ public interface Session
|
|||
* @param listener the listener to invoke when events happen on the stream just created
|
||||
* @param timeout the operation's timeout
|
||||
* @param unit the timeout's unit
|
||||
* @param callback the completion callback that gets notified of stream creation
|
||||
* @param promise the completion callback that gets notified of stream creation
|
||||
* @see #syn(SynInfo, StreamFrameListener)
|
||||
*/
|
||||
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Promise<Stream> callback);
|
||||
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Promise<Stream> promise);
|
||||
|
||||
|
||||
/**
|
||||
|
@ -158,10 +158,10 @@ public interface Session
|
|||
*
|
||||
* @param timeout the operation's timeout
|
||||
* @param unit the timeout's unit
|
||||
* @param callback the completion callback that gets notified of ping's send
|
||||
* @param promise the completion callback that gets notified of ping's send
|
||||
* @see #ping()
|
||||
*/
|
||||
public void ping(long timeout, TimeUnit unit, Promise<PingInfo> callback);
|
||||
public void ping(long timeout, TimeUnit unit, Promise<PingInfo> promise);
|
||||
|
||||
/**
|
||||
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
|
||||
|
|
|
@ -413,7 +413,7 @@ public class StandardSessionTest
|
|||
|
||||
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
|
||||
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
|
||||
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
|
||||
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
|
||||
stream.updateWindowSize(8192);
|
||||
Callback.Adapter callback = new Callback.Adapter()
|
||||
{
|
||||
|
|
|
@ -66,7 +66,7 @@ public class StandardStreamTest
|
|||
@Test
|
||||
public void testSyn()
|
||||
{
|
||||
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
|
||||
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
|
||||
Set<Stream> streams = new HashSet<>();
|
||||
streams.add(stream);
|
||||
when(synStreamFrame.isClose()).thenReturn(false);
|
||||
|
@ -100,7 +100,7 @@ public class StandardStreamTest
|
|||
@Test
|
||||
public void testSynOnClosedStream()
|
||||
{
|
||||
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
|
||||
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
|
||||
stream.updateCloseState(true, true);
|
||||
stream.updateCloseState(true, false);
|
||||
assertThat("stream expected to be closed", stream.isClosed(), is(true));
|
||||
|
@ -121,7 +121,7 @@ public class StandardStreamTest
|
|||
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
|
||||
{
|
||||
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
|
||||
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
|
||||
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
|
||||
stream.updateWindowSize(8192);
|
||||
stream.updateCloseState(synStreamFrame.isClose(), true);
|
||||
assertThat("stream is half closed", stream.isHalfClosed(), is(true));
|
||||
|
|
|
@ -211,7 +211,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
|
|||
|
||||
private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
|
||||
{
|
||||
super(id, priority, session, associatedStream);
|
||||
super(id, priority, session, associatedStream,null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -314,7 +314,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
|
|||
{
|
||||
private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
|
||||
{
|
||||
super(id, priority, session, associatedStream);
|
||||
super(id, priority, session, associatedStream,null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,10 +18,24 @@
|
|||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** A Callback that can fulfil a Promise.
|
||||
* <p>When this callback is successful, it will call the
|
||||
* {@link Promise#succeeded(Object)} method of the wrapped Promise,
|
||||
* passing the held result.
|
||||
* @param <R> The type of the result for the promise.
|
||||
*/
|
||||
public class PromisingCallback<R> implements Callback
|
||||
{
|
||||
private final Promise<R> _promise;
|
||||
private final R _result;
|
||||
|
||||
protected PromisingCallback(Promise<R> promise)
|
||||
{
|
||||
_promise=promise;
|
||||
_result=(R)this;
|
||||
}
|
||||
|
||||
public PromisingCallback(Promise<R> promise, R result)
|
||||
{
|
||||
|
@ -32,13 +46,15 @@ public class PromisingCallback<R> implements Callback
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
_promise.succeeded(_result);
|
||||
if (_promise!=null)
|
||||
_promise.succeeded(_result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
_promise.failed(x);
|
||||
if (_promise!=null)
|
||||
_promise.failed(x);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue