Revised exception handling.

This commit is contained in:
Simone Bordet 2012-02-29 00:43:17 +01:00
parent 1238de8a28
commit 3b8c6dfd4c
2 changed files with 42 additions and 117 deletions

View File

@ -142,16 +142,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
int streamId = streamIds.getAndAdd(2); int streamId = streamIds.getAndAdd(2);
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, 0, synInfo.getPriority(), synInfo.getHeaders()); SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, 0, synInfo.getPriority(), synInfo.getHeaders());
final IStream stream = createStream(synStream, listener); final IStream stream = createStream(synStream, listener);
try control(stream, synStream, timeout, unit, handler, stream);
{
// May throw if wrong version or headers too big
control(stream, synStream, timeout, unit, handler, stream);
}
catch (StreamException x)
{
removeStream(stream);
handler.failed(x);
}
} }
} }
} }
@ -167,23 +158,15 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override @Override
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler) public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try // SPEC v3, 2.2.2
if (goAwaySent.get())
{ {
// SPEC v3, 2.2.2 handler.completed(null);
if (goAwaySent.get())
{
handler.completed(null);
}
else
{
RstStreamFrame frame = new RstStreamFrame(version, rstInfo.getStreamId(), rstInfo.getStreamStatus().getCode(version));
control(null, frame, timeout, unit, handler, null);
}
} }
catch (StreamException x) else
{ {
logger.info("Could not send reset on stream " + rstInfo.getStreamId(), x); RstStreamFrame frame = new RstStreamFrame(version, rstInfo.getStreamId(), rstInfo.getStreamStatus().getCode(version));
handler.failed(x); control(null, frame, timeout, unit, handler, null);
} }
} }
@ -198,15 +181,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override @Override
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler) public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
{ control(null, frame, timeout, unit, handler, null);
SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
control(null, frame, timeout, unit, handler, null);
}
catch (StreamException x)
{
handler.failed(x);
}
} }
@Override @Override
@ -222,15 +198,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{ {
int pingId = pingIds.getAndAdd(2); int pingId = pingIds.getAndAdd(2);
PingInfo pingInfo = new PingInfo(pingId); PingInfo pingInfo = new PingInfo(pingId);
try PingFrame frame = new PingFrame(version, pingId);
{ control(null, frame, timeout, unit, handler, pingInfo);
PingFrame frame = new PingFrame(version, pingId);
control(null, frame, timeout, unit, handler, pingInfo);
}
catch (StreamException x)
{
handler.failed(x);
}
} }
@Override @Override
@ -248,16 +217,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{ {
if (!goAwayReceived.get()) if (!goAwayReceived.get())
{ {
try GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), SessionStatus.OK.getCode());
{ control(null, frame, timeout, unit, handler, null);
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), SessionStatus.OK.getCode()); return;
control(null, frame, timeout, unit, handler, null);
return;
}
catch (StreamException x)
{
handler.failed(x);
}
} }
} }
handler.completed(null); handler.completed(null);
@ -555,30 +517,23 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void onPing(final PingFrame frame) private void onPing(final PingFrame frame)
{ {
try int pingId = frame.getPingId();
if (pingId % 2 == pingIds.get() % 2)
{ {
int pingId = frame.getPingId(); execute(new Runnable()
if (pingId % 2 == pingIds.get() % 2)
{ {
execute(new Runnable() @Override
public void run()
{ {
@Override PingInfo pingInfo = new PingInfo(frame.getPingId());
public void run() notifyOnPing(pingInfo);
{ flush();
PingInfo pingInfo = new PingInfo(frame.getPingId()); }
notifyOnPing(pingInfo); });
flush();
}
});
}
else
{
control(null, frame, 0, TimeUnit.MILLISECONDS, new Promise<>(), null);
}
} }
catch (StreamException x) else
{ {
throw new SPDYException(x); control(null, frame, 0, TimeUnit.MILLISECONDS, new Promise<>(), null);
} }
} }

View File

@ -203,14 +203,13 @@ public class StandardStream implements IStream
super.consume(delta); super.consume(delta);
// This is the algorithm for flow control. // This is the algorithm for flow control.
// This method may be called multiple times // This method may be called multiple times with delta=1, but we only send a window
// with delta=1, but we only send a window // update when the whole dataInfo has been consumed.
// update when the whole dataInfo has been // Other policies may be to send window updates when consumed() is greater than
// consumed. // a certain threshold, etc. but for now the policy is not pluggable for simplicity.
// Other policies may be to send window // Note that the frequency of window updates depends on the read buffer, that
// updates when consumed() is greater than // should not be too smaller than the window size to avoid frequent window updates.
// a certain threshold, etc. but for now // Therefore, a pluggable policy should be able to modify the read buffer capacity.
// the policy is not pluggable for simplicity.
if (consumed() == length() && !isClosed()) if (consumed() == length() && !isClosed())
windowUpdate(length()); windowUpdate(length());
} }
@ -260,21 +259,10 @@ public class StandardStream implements IStream
private void windowUpdate(int delta) private void windowUpdate(int delta)
{ {
try if (delta > 0)
{ {
if (delta > 0) WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta);
{ session.control(this, windowUpdateFrame, 0, TimeUnit.MILLISECONDS, new Promise<>(), null);
// TODO: if the read buffer is small, but the default window size is big,
// we will send many window update frames... perhaps we can delay
// window update frames until we have a bigger delta to send
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta);
session.control(this, windowUpdateFrame, 0, TimeUnit.MILLISECONDS, new Promise<>(), null);
}
}
catch (StreamException x)
{
logger.debug("Could not send window update on stream " + this, x);
session.rst(new RstInfo(getId(), x.getStreamStatus()));
} }
} }
@ -341,18 +329,9 @@ public class StandardStream implements IStream
@Override @Override
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler) public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try updateCloseState(replyInfo.isClose());
{ SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
updateCloseState(replyInfo.isClose()); session.control(this, frame, timeout, unit, handler, null);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
catch (StreamException x)
{
logger.debug("Could not send reply on stream " + this, x);
handler.failed(x);
session.rst(new RstInfo(getId(), x.getStreamStatus()));
}
} }
@Override @Override
@ -382,18 +361,9 @@ public class StandardStream implements IStream
@Override @Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler) public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try updateCloseState(headersInfo.isClose());
{ HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
updateCloseState(headersInfo.isClose()); session.control(this, frame, timeout, unit, handler, null);
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
catch (StreamException x)
{
logger.debug("Could not send headers on stream " + this, x);
handler.failed(x);
session.rst(new RstInfo(getId(), x.getStreamStatus()));
}
} }
@Override @Override