Corrected changes introduced with Promise.

Save creation of iterators for every append() and prepend() in StandardSession.
Removed PromisingCallback, only used by SPDY and better implemented otherwise.
This commit is contained in:
Simone Bordet 2012-11-23 12:06:46 +01:00
parent ab642141d1
commit ee893d8526
8 changed files with 79 additions and 138 deletions

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.spdy.Controller;
import org.eclipse.jetty.spdy.ISession;
import org.eclipse.jetty.spdy.IdleListener;
import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -52,7 +51,11 @@ public class SPDYConnection extends AbstractConnection implements Controller, Id
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor,int bufferSize)
{
// TODO explain why we are passing false here
// Since SPDY is multiplexed, onFillable() must never block
// while calling application code. In fact, onFillable()
// always dispatches to a new thread when calling application
// code, so here we can safely pass false as last parameter,
// and avoid to dispatch to onFillable().
super(endPoint, executor, false);
this.bufferPool = bufferPool;
this.parser = parser;

View File

@ -24,13 +24,14 @@ import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.util.Callback;
/**
* <p>The internal interface that represents a stream.</p>
* <p>{@link IStream} contains additional methods used by a SPDY
* implementation (and not by an application).</p>
*/
public interface IStream extends Stream
public interface IStream extends Stream, Callback
{
/**
* <p>Senders of data frames need to know the current window size

View File

@ -24,10 +24,8 @@ import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -172,7 +170,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
int streamId = streamIds.getAndAdd(2);
// TODO: for SPDYv3 we need to support the "slot" argument
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
StandardStream stream = createStream(synStream, listener, true, promise);
IStream stream = createStream(synStream, listener, true, promise);
generateAndEnqueueControlFrame(stream, synStream, timeout, unit, stream);
}
flush();
@ -235,9 +233,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
public void ping(long timeout, TimeUnit unit, Promise<PingInfo> promise)
{
int pingId = pingIds.getAndAdd(2);
PromisingPingInfoCallback pingInfo = new PromisingPingInfoCallback(pingId,promise);
PingFrame frame = new PingFrame(version,pingId);
control(null,frame,timeout,unit,pingInfo);
PingInfoCallback pingInfo = new PingInfoCallback(pingId, promise);
PingFrame frame = new PingFrame(version, pingId);
control(null, frame, timeout, unit, pingInfo);
}
@Override
@ -456,7 +454,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private void onSyn(SynStreamFrame frame)
{
IStream stream = createStream(frame, null, false,null);
IStream stream = createStream(frame, null, false, null);
if (stream != null)
processSyn(listener, stream, frame);
}
@ -475,12 +473,12 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
removeStream(stream);
}
private StandardStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local,Promise<Stream> promise)
private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
StandardStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,promise);
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
flowControlStrategy.onNewStream(this, stream);
stream.updateCloseState(frame.isClose(), local);
stream.setStreamFrameListener(listener);
@ -898,10 +896,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
return;
Set<IStream> stalledStreams = null;
Iterator<FrameBytes> iter = queue.iterator();
while(iter.hasNext())
for (int i = 0; i < queue.size(); ++i)
{
frameBytes=iter.next();
frameBytes = queue.get(i);
IStream stream = frameBytes.getStream();
if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
@ -910,7 +907,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
buffer = frameBytes.getByteBuffer();
if (buffer != null)
{
iter.remove();
queue.remove(i);
if (stream != null && stream.isReset())
{
frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM,
@ -924,7 +921,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
stalledStreams = new HashSet<>();
if (stream != null)
stalledStreams.add(stream);
LOG.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size());
}
@ -934,7 +931,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
flushing = true;
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
}
write(buffer,frameBytes);
write(buffer, frameBytes);
}
private void append(FrameBytes frameBytes)
@ -945,18 +942,15 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
failure = this.failure;
if (failure == null)
{
ListIterator<FrameBytes> iter = queue.listIterator(queue.size());
while(iter.hasPrevious())
int index = queue.size();
while (index > 0)
{
FrameBytes element = iter.previous();
FrameBytes element = queue.get(index - 1);
if (element.compareTo(frameBytes) >= 0)
{
iter.next();
break;
}
--index;
}
iter.add(frameBytes);
queue.add(index, frameBytes);
}
}
@ -972,18 +966,15 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
failure = this.failure;
if (failure == null)
{
ListIterator<FrameBytes> iter = queue.listIterator(0);
while(iter.hasNext())
int index = 0;
while (index < queue.size())
{
FrameBytes element = iter.next();
FrameBytes element = queue.get(index);
if (element.compareTo(frameBytes) <= 0)
{
iter.previous();
break;
}
++index;
}
iter.add(frameBytes);
queue.add(index,frameBytes);
}
}
@ -1153,7 +1144,6 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
close();
fail(new InterruptedByTimeoutException());
}
@Override
public void succeeded()
@ -1170,9 +1160,6 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
@Override
public void failed(Throwable x)
{
// TODO because this is using frameBytes here, then it is not really a Promise.
// frameBytes is not a result, but is something known before the operation is attempted!
List<FrameBytes> frameBytesToFail = new ArrayList<>();
frameBytesToFail.add(this);
@ -1245,7 +1232,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
{
super(stream,handler);
super(stream, handler);
this.dataInfo = dataInfo;
}
@ -1323,30 +1310,29 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
close();
}
}
private static class PromisingPingInfoCallback extends PingInfo implements Callback
private static class PingInfoCallback extends PingInfo implements Callback
{
public PromisingPingInfoCallback(int pingId,Promise<PingInfo> promise)
private final Promise<PingInfo> promise;
public PingInfoCallback(int pingId, Promise<PingInfo> promise)
{
super(pingId);
this.promise=promise;
}
private final Promise<PingInfo> promise;
@Override
public void succeeded()
{
if (promise!=null)
if (promise != null)
promise.succeeded(this);
}
@Override
public void failed(Throwable x)
{
if (promise!=null)
if (promise != null)
promise.failed(x);
}
}
}

View File

@ -41,11 +41,10 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.PromisingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class StandardStream extends PromisingCallback<Stream> implements IStream
public class StandardStream implements IStream
{
private static final Logger LOG = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@ -53,6 +52,7 @@ public class StandardStream extends PromisingCallback<Stream> implements IStream
private final byte priority;
private final ISession session;
private final IStream associatedStream;
private final Promise<Stream> promise;
private final AtomicInteger windowSize = new AtomicInteger();
private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
private volatile StreamFrameListener listener;
@ -62,11 +62,11 @@ public class StandardStream extends PromisingCallback<Stream> implements IStream
public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
{
super(promise);
this.id = id;
this.priority = priority;
this.session = session;
this.associatedStream = associatedStream;
this.promise = promise;
}
@Override
@ -253,6 +253,20 @@ public class StandardStream extends PromisingCallback<Stream> implements IStream
session.flush();
}
@Override
public void succeeded()
{
if (promise != null)
promise.succeeded(this);
}
@Override
public void failed(Throwable x)
{
if (promise != null)
promise.failed(x);
}
private void notifyOnReply(ReplyInfo replyInfo)
{
final StreamFrameListener listener = this.listener;
@ -329,23 +343,23 @@ public class StandardStream extends PromisingCallback<Stream> implements IStream
}
@Override
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Promise<Stream> callback)
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Promise<Stream> promise)
{
if (isClosed() || isReset())
{
callback.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
"Stream: " + this + " already closed or reset!"));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
session.syn(pushSynInfo, null, timeout, unit, callback);
session.syn(pushSynInfo, null, timeout, unit, promise);
}
@Override
public Future<Void> reply(ReplyInfo replyInfo)
{
FutureCallback result = new FutureCallback();
reply(replyInfo,0,TimeUnit.MILLISECONDS,result);
reply(replyInfo, 0, TimeUnit.MILLISECONDS, result);
return result;
}
@ -363,9 +377,9 @@ public class StandardStream extends PromisingCallback<Stream> implements IStream
@Override
public Future<Void> data(DataInfo dataInfo)
{
FutureCallback fcb = new FutureCallback();
data(dataInfo,0,TimeUnit.MILLISECONDS,fcb);
return fcb;
FutureCallback result = new FutureCallback();
data(dataInfo, 0, TimeUnit.MILLISECONDS, result);
return result;
}
@Override
@ -390,9 +404,9 @@ public class StandardStream extends PromisingCallback<Stream> implements IStream
@Override
public Future<Void> headers(HeadersInfo headersInfo)
{
FutureCallback fcb = new FutureCallback();
headers(headersInfo,0,TimeUnit.MILLISECONDS,fcb);
return fcb;
FutureCallback result = new FutureCallback();
headers(headersInfo, 0, TimeUnit.MILLISECONDS, result);
return result;
}
@Override

View File

@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -73,23 +73,20 @@ public class StandardSessionTest
{
@Mock
private Controller controller;
private ByteBufferPool bufferPool;
private Executor threadPool;
private ExecutorService threadPool;
private StandardSession session;
private Generator generator;
private Scheduler scheduler;
private Fields headers;
@Before
public void setUp() throws Exception
{
bufferPool = new MappedByteBufferPool();
threadPool = Executors.newCachedThreadPool();
scheduler = new TimerScheduler();
scheduler.start();
generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator,new FlowControlStrategy.None());
ByteBufferPool bufferPool = new MappedByteBufferPool();
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
session = new StandardSession(SPDY.V2, bufferPool,threadPool,scheduler,controller,null,1,null, generator,new FlowControlStrategy.None());
headers = new Fields();
}
@ -97,6 +94,7 @@ public class StandardSessionTest
public void after() throws Exception
{
scheduler.stop();
threadPool.shutdownNow();
}
@SuppressWarnings("unchecked")
@ -413,7 +411,7 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
stream.updateWindowSize(8192);
Callback.Adapter callback = new Callback.Adapter()
{
@ -492,7 +490,7 @@ public class StandardSessionTest
private void assertThatPushStreamIsNotInSession(Stream pushStream)
{
assertThat("pushStream is not in session",session.getStreams().contains(pushStream.getId()),not(true));
assertThat("pushStream is not in session",session.getStreams().contains(pushStream),not(true));
}
private void assertThatPushStreamIsInSession(Stream pushStream)

View File

@ -66,7 +66,7 @@ public class StandardStreamTest
@Test
public void testSyn()
{
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
@ -100,7 +100,7 @@ public class StandardStreamTest
@Test
public void testSynOnClosedStream()
{
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
stream.updateCloseState(true, true);
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
@ -121,7 +121,7 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null,null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));

View File

@ -211,15 +211,14 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream,null);
super(id, priority, session, associatedStream, null);
}
@Override
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Promise<Stream> handler)
{
// TODO is this right? comment or imple are wrong??
// HTTP does not support pushed streams
handler.succeeded(this);
handler.succeeded(new HTTPPushStream(2, getPriority(), getSession(), this));
}
@Override
@ -314,7 +313,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
{
private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream,null);
super(id, priority, session, associatedStream, null);
}
@Override

View File

@ -1,60 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
/* ------------------------------------------------------------ */
/** A Callback that can fulfil a Promise.
* <p>When this callback is successful, it will call the
* {@link Promise#succeeded(Object)} method of the wrapped Promise,
* passing the held result.
* @param <R> The type of the result for the promise.
*/
public class PromisingCallback<R> implements Callback
{
private final Promise<R> _promise;
private final R _result;
protected PromisingCallback(Promise<R> promise)
{
_promise=promise;
_result=(R)this;
}
public PromisingCallback(Promise<R> promise, R result)
{
_promise=promise;
_result=result;
}
@Override
public void succeeded()
{
if (_promise!=null)
_promise.succeeded(_result);
}
@Override
public void failed(Throwable x)
{
if (_promise!=null)
_promise.failed(x);
}
}