Consolidated usage of async handlers into one class only, instead of three.

This commit is contained in:
Simone Bordet 2012-02-22 16:02:06 +01:00
parent 77cdee95ce
commit 70f6d555bf
14 changed files with 86 additions and 172 deletions

View File

@ -25,9 +25,9 @@ import org.eclipse.jetty.spdy.frames.ControlFrame;
public interface ISession extends Session
{
public void control(IStream stream, ControlFrame frame, Handler handler) throws StreamException;
public <C> void control(IStream stream, ControlFrame frame, Handler<C> handler, C context) throws StreamException;
public void data(IStream stream, DataInfo dataInfo, Handler handler);
public <C> void data(IStream stream, DataInfo dataInfo, Handler<C> handler, C context);
public int getWindowSize();
@ -36,10 +36,5 @@ public interface ISession extends Session
public int write(ByteBuffer buffer, Handler<T> handler, T context);
public void close(boolean onlyOutput);
public interface Handler<C>
{
public void complete(C context);
}
}
}

View File

@ -22,9 +22,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.Handler;
public class Promise<T> extends ResultHandler<T> implements Future<T>
public class Promise<T> implements Handler<T>, Future<T>
{
private final CountDownLatch latch = new CountDownLatch(1);
private boolean cancelled;

View File

@ -32,7 +32,6 @@ import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
@ -59,7 +58,7 @@ import org.eclipse.jetty.spdy.parser.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardSession implements ISession, Parser.Listener, ISession.Controller.Handler<StandardSession.FrameBytes>
public class StandardSession implements ISession, Parser.Listener, Handler<StandardSession.FrameBytes>
{
private static final Logger logger = LoggerFactory.getLogger(Session.class);
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
@ -112,7 +111,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void syn(SynInfo synInfo, StreamFrameListener listener, final ResultHandler<Stream> handler)
public void syn(SynInfo synInfo, StreamFrameListener listener, final Handler<Stream> handler)
{
// Synchronization is necessary.
// SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
@ -135,27 +134,13 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
try
{
// May throw if wrong version or headers too big
control(stream, synStream, new Handler()
{
@Override
public void completed()
{
handler.completed(stream);
}
@Override
public void failed(Throwable x)
{
handler.failed(x);
}
});
control(stream, synStream, handler, stream);
flush();
}
catch (StreamException x)
{
removeStream(stream);
handler.failed(x);
throw new SPDYException(x);
}
}
}
@ -170,19 +155,19 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void rst(RstInfo rstInfo, Handler handler)
public void rst(RstInfo rstInfo, Handler<Void> handler)
{
try
{
// SPEC v3, 2.2.2
if (goAwaySent.get())
{
handler.completed();
handler.completed(null);
}
else
{
RstStreamFrame frame = new RstStreamFrame(version, rstInfo.getStreamId(), rstInfo.getStreamStatus().getCode(version));
control(null, frame, handler);
control(null, frame, handler, null);
flush();
}
}
@ -202,19 +187,17 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void settings(SettingsInfo settingsInfo, Handler handler)
public void settings(SettingsInfo settingsInfo, Handler<Void> handler)
{
try
{
SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
control(null, frame, handler);
control(null, frame, handler, null);
flush();
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
handler.failed(x);
throw new SPDYException(x);
}
}
@ -227,34 +210,19 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void ping(final ResultHandler<PingInfo> handler)
public void ping(final Handler<PingInfo> handler)
{
try
{
int pingId = pingIds.getAndAdd(2);
final PingInfo pingInfo = new PingInfo(pingId);
PingFrame frame = new PingFrame(version, pingId);
control(null, frame, new Handler()
{
@Override
public void completed()
{
handler.completed(pingInfo);
}
@Override
public void failed(Throwable x)
{
handler.failed(x);
}
});
control(null, frame, handler, pingInfo);
flush();
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
handler.failed(x);
throw new SPDYException(x);
}
}
@ -267,7 +235,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void goAway(Handler handler)
public void goAway(Handler<Void> handler)
{
if (goAwaySent.compareAndSet(false, true))
{
@ -276,19 +244,17 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
try
{
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), SessionStatus.OK.getCode());
control(null, frame, handler);
control(null, frame, handler, null);
flush();
return;
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
handler.failed(x);
throw new SPDYException(x);
}
}
}
handler.completed();
handler.completed(null);
}
@Override
@ -535,7 +501,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
if (pingId % 2 == pingIds.get() % 2)
notifyOnPing(frame);
else
control(null, frame, new Promise<>());
control(null, frame, new Promise<>(), null);
flush();
}
catch (StreamException x)
@ -664,13 +630,13 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void control(IStream stream, ControlFrame frame, Handler handler) throws StreamException
public <C> void control(IStream stream, ControlFrame frame, Handler<C> handler, C context) throws StreamException
{
if (stream != null)
updateLastStreamId(stream);
ByteBuffer buffer = generator.control(frame);
logger.debug("Posting {} on {}", frame, stream);
enqueueLast(new ControlFrameBytes(frame, buffer, handler));
enqueueLast(new ControlFrameBytes<>(frame, buffer, handler, context));
}
private void updateLastStreamId(IStream stream)
@ -690,10 +656,10 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void data(IStream stream, DataInfo dataInfo, Handler handler)
public <C> void data(IStream stream, DataInfo dataInfo, Handler<C> handler, C context)
{
logger.debug("Posting {} on {}", dataInfo, stream);
enqueueLast(new DataFrameBytes(stream, dataInfo, handler));
enqueueLast(new DataFrameBytes<>(stream, dataInfo, handler, context));
flush();
}
@ -752,7 +718,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void complete(FrameBytes frameBytes)
public void completed(FrameBytes frameBytes)
{
synchronized (queue)
{
@ -763,7 +729,13 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
flush();
}
protected void write(final ByteBuffer buffer, Controller.Handler<FrameBytes> handler, FrameBytes frameBytes)
@Override
public void failed(Throwable x)
{
throw new SPDYException(x);
}
protected void write(final ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
{
controller.write(buffer, handler, frameBytes);
}
@ -775,17 +747,19 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
public abstract void complete();
}
private class ControlFrameBytes implements FrameBytes
private class ControlFrameBytes<C> implements FrameBytes
{
private final ControlFrame frame;
private final ByteBuffer buffer;
private final Handler handler;
private final Handler<C> handler;
private final C context;
private ControlFrameBytes(ControlFrame frame, ByteBuffer buffer, Handler handler)
private ControlFrameBytes(ControlFrame frame, ByteBuffer buffer, Handler<C> handler, C context)
{
this.frame = frame;
this.buffer = buffer;
this.handler = handler;
this.context = context;
}
@Override
@ -803,7 +777,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
// Recipients will know the last good stream id and act accordingly.
controller.close(false);
}
handler.completed();
handler.completed(context);
}
@Override
@ -813,18 +787,20 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
}
private class DataFrameBytes implements FrameBytes
private class DataFrameBytes<C> implements FrameBytes
{
private final IStream stream;
private final DataInfo data;
private final Handler handler;
private final Handler<C> handler;
private final C context;
private int dataLength;
private DataFrameBytes(IStream stream, DataInfo data, Handler handler)
private DataFrameBytes(IStream stream, DataInfo data, Handler<C> handler, C context)
{
this.stream = stream;
this.data = data;
this.handler = handler;
this.context = context;
}
@Override
@ -856,7 +832,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
stream.updateCloseState(data.isClose());
if (stream.isClosed())
removeStream(stream);
handler.completed();
handler.completed(context);
}
}

View File

@ -205,7 +205,7 @@ public class StandardStream implements IStream
// we will send many window update frames... perhaps we can delay
// window update frames until we have a bigger delta to send
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta);
session.control(this, windowUpdateFrame, new Promise<>());
session.control(this, windowUpdateFrame, new Promise<>(), null);
}
}
catch (StreamException x)
@ -275,13 +275,13 @@ public class StandardStream implements IStream
}
@Override
public void reply(ReplyInfo replyInfo, Handler handler)
public void reply(ReplyInfo replyInfo, Handler<Void> handler)
{
try
{
updateCloseState(replyInfo.isClose());
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, handler);
session.control(this, frame, handler, null);
}
catch (StreamException x)
{
@ -300,11 +300,11 @@ public class StandardStream implements IStream
}
@Override
public void data(DataInfo dataInfo, Handler handler)
public void data(DataInfo dataInfo, Handler<Void> handler)
{
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo, handler);
session.data(this, dataInfo, handler, null);
}
@Override
@ -316,13 +316,13 @@ public class StandardStream implements IStream
}
@Override
public void headers(HeadersInfo headersInfo, Handler handler)
public void headers(HeadersInfo headersInfo, Handler<Void> handler)
{
try
{
updateCloseState(headersInfo.isClose());
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, handler);
session.control(this, frame, handler, null);
}
catch (StreamException x)
{

View File

@ -16,28 +16,23 @@
package org.eclipse.jetty.spdy.api;
import java.util.concurrent.TimeUnit;
public abstract class Handler
public interface Handler<C>
{
private final int timeout;
private final TimeUnit timeUnit;
public abstract void completed(C context);
protected Handler()
public void failed(Throwable x);
public static class Adapter<C> implements Handler<C>
{
this(0, TimeUnit.MILLISECONDS);
}
@Override
public void completed(C context)
{
}
protected Handler(int timeout, TimeUnit timeUnit)
{
this.timeout = timeout;
this.timeUnit = timeUnit;
}
public abstract void completed();
public void failed(Throwable x)
{
throw new SPDYException(x);
@Override
public void failed(Throwable x)
{
throw new SPDYException(x);
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.api;
public abstract class ResultHandler<R> extends Handler
{
@Override
public final void completed()
{
completed(null);
}
public abstract void completed(R result);
}

View File

@ -71,7 +71,7 @@ public interface Session
* @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created
* @return a future for the stream that will be created
* @see #syn(SynInfo, StreamFrameListener, ResultHandler)
* @see #syn(SynInfo, StreamFrameListener, Handler)
*/
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener);
@ -85,7 +85,7 @@ public interface Session
* @param handler the completion handler that gets notified of stream creation
* @see #syn(SynInfo, StreamFrameListener)
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, ResultHandler<Stream> handler);
public void syn(SynInfo synInfo, StreamFrameListener listener, Handler<Stream> handler);
/**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
@ -104,7 +104,7 @@ public interface Session
* @param rstInfo the metadata to reset the stream
* @param handler the completion handler that gets notified of reset's send
*/
public void rst(RstInfo rstInfo, Handler handler);
public void rst(RstInfo rstInfo, Handler<Void> handler);
/**
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
@ -123,7 +123,7 @@ public interface Session
* @param settingsInfo the metadata to send
* @param handler the completion handler that gets notified of settings' send
*/
public void settings(SettingsInfo settingsInfo, Handler handler);
public void settings(SettingsInfo settingsInfo, Handler<Void> handler);
/**
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
@ -140,7 +140,7 @@ public interface Session
*
* @param handler the completion handler that gets notified of ping's send
*/
public void ping(ResultHandler<PingInfo> handler);
public void ping(Handler<PingInfo> handler);
/**
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
@ -157,7 +157,7 @@ public interface Session
*
* @param handler the completion handler that gets notified of go away's send
*/
public void goAway(Handler handler);
public void goAway(Handler<Void> handler);
/**
* <p>Initiates the flush of data to the other peer.</p>
@ -213,27 +213,4 @@ public interface Session
}
}
}
/*
public static abstract class SynHandler extends Promise<Stream>
{
@Override
public final void completed()
{
// Applications should not override this method, but the one below
}
public abstract void completed(Stream stream);
}
public static abstract class PingHandler extends Promise<PingInfo>
{
@Override
public final void completed()
{
// Applications should not override this method, but the one below
}
public abstract void completed(PingInfo stream);
}
*/
}

View File

@ -69,7 +69,7 @@ public interface Stream
* @param replyInfo the metadata to send
* @param handler the completion handler that gets notified of reply sent
*/
public void reply(ReplyInfo replyInfo, Handler handler);
public void reply(ReplyInfo replyInfo, Handler<Void> handler);
/**
* <p>Sends asynchronously a DATA frame on this stream.</p>
@ -91,7 +91,7 @@ public interface Stream
* @param dataInfo the metadata to send
* @param handler the completion handler that gets notified of data sent
*/
public void data(DataInfo dataInfo, Handler handler);
public void data(DataInfo dataInfo, Handler<Void> handler);
/**
* <p>Sends asynchronously a HEADER frame on this stream.</p>
@ -113,7 +113,7 @@ public interface Stream
* @param headersInfo the metadata to send
* @param handler the completion handler that gets notified of headers sent
*/
public void headers(HeadersInfo headersInfo, Handler handler);
public void headers(HeadersInfo headersInfo, Handler<Void> handler);
/**
* @return whether this stream has been closed by both parties

View File

@ -82,7 +82,7 @@ public class ClientUsageTest
// Then issue another similar request
stream.getSession().syn(new SynInfo(true), this);
}
}, new ResultHandler<Stream>()
}, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream stream)
@ -137,7 +137,7 @@ public class ClientUsageTest
}
}
}, new ResultHandler<Stream>()
}, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream stream)

View File

@ -71,7 +71,7 @@ public class ServerUsageTest
//
// However, the API may allow to initiate the stream
session.syn(new SynInfo(false), null, new ResultHandler<Stream>()
session.syn(new SynInfo(false), null, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream stream)
@ -101,7 +101,7 @@ public class ServerUsageTest
Session session = stream.getSession();
// Since it's unidirectional, no need to pass the listener
session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, new ResultHandler<Stream>()
session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream pushStream)

View File

@ -19,8 +19,7 @@
<version>2.11</version>
<configuration>
<argLine>
-Xbootclasspath/a:${settings.localRepository}/org/eclipse/jetty/jetty-npn-boot/${npn.version}/jetty-npn-boot-${npn.version}.jar
-javaagent:${settings.localRepository}/org/eclipse/jetty/jetty-npn-agent/${npn.version}/jetty-npn-agent-${npn.version}.jar
-Xbootclasspath/p:${settings.localRepository}/org/eclipse/jetty/npn-boot/${npn.version}/npn-boot-${npn.version}.jar
</argLine>
</configuration>
</plugin>

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.spdy.ISession.Controller;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.parser.Parser;
import org.slf4j.Logger;
@ -121,7 +121,7 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
}
@Override
public int write(ByteBuffer buffer, ISession.Controller.Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
{
int remaining = buffer.remaining();
Buffer jettyBuffer = buffer.isDirect() ? new DirectNIOBuffer(buffer, false) : new IndirectNIOBuffer(buffer, false);
@ -131,10 +131,10 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
int written = endPoint.flush(jettyBuffer);
logger.debug("Written {} bytes, {} remaining", written, jettyBuffer.length());
}
catch (IOException x)
catch (Exception x)
{
close(false);
throw new SPDYException(x);
handler.failed(x);
}
finally
{
@ -162,7 +162,7 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
// Volatile write to ensure visibility of write fields
writePending = false;
}
handler.complete(context);
handler.completed(context);
}
return remaining - buffer.remaining();

View File

@ -20,8 +20,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
@ -65,7 +65,7 @@ public class PingTest extends AbstractTest
@Override
public void onConnect(Session session)
{
session.ping(new ResultHandler<PingInfo>()
session.ping(new Handler.Adapter<PingInfo>()
{
@Override
public void completed(PingInfo pingInfo)

View File

@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -280,7 +280,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertEquals(clientData, data);
clientDataLatch.countDown();
}
}, new ResultHandler<Stream>()
}, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream stream)