Updated idle timeout checking.

Now that applications are notified in a different thread, idleness becomes an event
and therefore a new IdleListener interface has been introduced and implemented.
This commit is contained in:
Simone Bordet 2012-03-01 14:59:30 +01:00
parent 49bb4f8e8b
commit e8d09fad53
10 changed files with 114 additions and 51 deletions

View File

@ -0,0 +1,28 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.api.Handler;
public interface Controller<T>
{
public int write(ByteBuffer buffer, Handler<T> handler, T context);
public void close(boolean onlyOutput);
}

View File

@ -16,7 +16,6 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.DataInfo;
@ -41,10 +40,4 @@ public interface ISession extends Session
public int getWindowSize();
public interface Controller<T>
{
public int write(ByteBuffer buffer, Handler<T> handler, T context);
public void close(boolean onlyOutput);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
public interface IdleListener
{
public void onIdle(boolean idle);
}

View File

@ -74,6 +74,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final ScheduledExecutorService scheduler;
private final short version;
private final Controller<FrameBytes> controller;
private final IdleListener idleListener;
private final AtomicInteger streamIds;
private final AtomicInteger pingIds;
private final SessionFrameListener listener;
@ -84,12 +85,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private boolean flushing;
private volatile int windowSize = 65536;
public StandardSession(short version, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, int initialStreamId, SessionFrameListener listener, Generator generator)
public StandardSession(short version, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator)
{
this.version = version;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.controller = controller;
this.idleListener = idleListener;
this.streamIds = new AtomicInteger(initialStreamId);
this.pingIds = new AtomicInteger(initialStreamId);
this.listener = listener;
@ -741,9 +743,24 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void execute(Runnable task)
public void execute(final Runnable task)
{
threadPool.execute(task);
idleListener.onIdle(false);
threadPool.execute(new Runnable()
{
@Override
public void run()
{
try
{
task.run();
}
finally
{
idleListener.onIdle(true);
}
}
});
}
@Override

View File

@ -246,11 +246,17 @@ public class StandardStream implements IStream
@Override
public void run()
{
logger.debug("Executing task {}", task);
task.run();
logger.debug("Completing task {}", task);
dispatched = false;
dispatch();
try
{
logger.debug("Executing task {}", task);
task.run();
}
finally
{
logger.debug("Completing task {}", task);
dispatched = false;
dispatch();
}
}
});
}

View File

@ -46,7 +46,7 @@ public class AsyncTimeoutTest
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), 1, null, generator)
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), null, 1, null, generator)
{
@Override
public void flush()
@ -90,7 +90,7 @@ public class AsyncTimeoutTest
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), 1, null, generator)
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), null, 1, null, generator)
{
private final AtomicInteger flushes = new AtomicInteger();
@ -130,7 +130,7 @@ public class AsyncTimeoutTest
Assert.assertTrue(failedLatch.await(2 * timeout, unit));
}
private static class TestController implements ISession.Controller<StandardSession.FrameBytes>
private static class TestController implements Controller<StandardSession.FrameBytes>
{
@Override
public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)

View File

@ -29,7 +29,7 @@ public class ClientUsageTest
@Test
public void testClientRequestResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@ -48,7 +48,7 @@ public class ClientUsageTest
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@ -69,7 +69,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
final String context = "context";
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
@ -104,7 +104,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{

View File

@ -27,14 +27,13 @@ import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.spdy.ISession.Controller;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.parser.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller<StandardSession.FrameBytes>
public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller<StandardSession.FrameBytes>, IdleListener
{
private static final Logger logger = LoggerFactory.getLogger(SPDYAsyncConnection.class);
private final Parser parser;
@ -48,41 +47,33 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
{
super(endPoint);
this.parser = parser;
endPoint.setCheckForIdle(true);
onIdle(true);
}
@Override
public Connection handle() throws IOException
{
AsyncEndPoint endPoint = getEndPoint();
try
boolean progress = true;
while (endPoint.isOpen() && progress)
{
endPoint.setCheckForIdle(false);
boolean progress = true;
while (endPoint.isOpen() && progress)
int filled = fill();
progress = filled > 0;
int flushed = flush();
progress |= flushed > 0;
endPoint.flush();
progress |= endPoint.hasProgressed();
if (!progress && filled < 0)
{
int filled = fill();
progress = filled > 0;
int flushed = flush();
progress |= flushed > 0;
endPoint.flush();
progress |= endPoint.hasProgressed();
if (!progress && filled < 0)
{
onInputShutdown();
close(false);
}
onInputShutdown();
close(false);
}
return this;
}
finally
{
endPoint.setCheckForIdle(true);
}
return this;
}
public int fill() throws IOException
@ -189,6 +180,12 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
}
}
@Override
public void onIdle(boolean idle)
{
getEndPoint().setCheckForIdle(idle);
}
@Override
public AsyncEndPoint getEndPoint()
{

View File

@ -422,7 +422,7 @@ public class SPDYClient
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.threadPool, factory.scheduler, connection, 1, sessionPromise.listener, generator);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator);
parser.addListener(session);
sessionPromise.completed(session);
connection.setSession(session);

View File

@ -64,7 +64,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, threadPool, scheduler, connection, 2, listener, generator);
final StandardSession session = new StandardSession(version, threadPool, scheduler, connection, connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);