Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

Conflicts:
	jetty-io/src/main/java/org/eclipse/jetty/io/AsyncConnection.java
This commit is contained in:
Greg Wilkins 2012-05-09 11:19:52 +02:00
commit 2b9ff87e02
45 changed files with 385 additions and 391 deletions

View File

@ -17,12 +17,16 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
}
public abstract void onReadable();
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncConnection#onClose()
*/
@Override
public abstract void onClose();
public void onOpen()
{
}
@Override
public void onClose()
{
}
/* ------------------------------------------------------------ */
/**
@ -34,7 +38,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
return _endp;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncConnection#onIdleExpired(long)

View File

@ -2,10 +2,12 @@ package org.eclipse.jetty.io;
public interface AsyncConnection
{
public abstract void onClose();
public abstract AsyncEndPoint getEndPoint();
void onClose();
public abstract void onIdleExpired(long idleForMs);
void onOpen();
}
AsyncEndPoint getEndPoint();
void onIdleExpired(long idleForMs);
}

View File

@ -120,4 +120,7 @@ public interface AsyncEndPoint extends EndPoint
*/
long getIdleTimestamp();
AsyncConnection getAsyncConnection();
void setAsyncConnection(AsyncConnection connection);
}

View File

@ -328,19 +328,19 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
/**
* @param endpoint
*/
protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
protected abstract void endPointClosed(AsyncEndPoint endpoint);
/* ------------------------------------------------------------ */
/**
* @param endpoint
*/
protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
protected abstract void endPointOpened(AsyncEndPoint endpoint);
/* ------------------------------------------------------------ */
protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,AsyncConnection oldConnection);
protected abstract void endPointUpgraded(AsyncEndPoint endpoint,AsyncConnection oldConnection);
/* ------------------------------------------------------------------------------- */
public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment);
public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
/* ------------------------------------------------------------ */
/**
@ -351,7 +351,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @return the new endpoint {@link SelectChannelEndPoint}
* @throws IOException
*/
protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
/* ------------------------------------------------------------------------------- */
protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
@ -392,7 +392,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private boolean _pausing;
private boolean _paused;
private volatile long _idleTick;
private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
private ConcurrentMap<AsyncEndPoint,Object> _endPoints = new ConcurrentHashMap<>();
/* ------------------------------------------------------------ */
SelectSet(int acceptorID) throws Exception
@ -468,7 +468,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
{
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
AsyncEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
}
else if (channel.isOpen())
@ -482,7 +482,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
final SocketChannel channel=(SocketChannel)change;
ch=channel;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
AsyncEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
}
else if (change instanceof ChangeTask)
@ -624,9 +624,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (connected)
{
key.interestOps(SelectionKey.OP_READ);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
AsyncEndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
endpoint.onSelected();
// TODO: remove the cast
((SelectChannelEndPoint)endpoint).onSelected();
}
else
{
@ -638,10 +639,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
// Wrap readable registered channel in an endpoint
channel = (SocketChannel)key.channel();
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
AsyncEndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
if (key.isReadable())
endpoint.onSelected();
{
// TODO: remove the cast
((SelectChannelEndPoint)endpoint).onSelected();
}
}
key = null;
}
@ -697,9 +701,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
public void run()
{
for (SelectChannelEndPoint endp:_endPoints.keySet())
for (AsyncEndPoint endp:_endPoints.keySet())
{
endp.checkForIdleOrReadWriteTimeout(idle_now);
// TODO: remove the cast
((SelectChannelEndPoint)endp).checkForIdleOrReadWriteTimeout(idle_now);
}
}
public String toString() {return "Idle-"+super.toString();}
@ -824,9 +829,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
/* ------------------------------------------------------------ */
private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
{
SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
AsyncEndPoint endp = newEndPoint(channel, this, sKey);
LOG.debug("created {}",endp);
endPointOpened(endp);
_endPoints.put(endp,this);

View File

@ -120,12 +120,6 @@ public class SslConnection extends AbstractAsyncConnection
_appEndPoint = newAppEndPoint();
}
/* ------------------------------------------------------------ */
public void setAppConnection(AsyncConnection connection)
{
_appConnection=connection;
}
/* ------------------------------------------------------------ */
protected AppEndPoint newAppEndPoint()
{
@ -815,5 +809,17 @@ public class SslConnection extends AbstractAsyncConnection
_lock.unlock();
}
}
@Override
public AsyncConnection getAsyncConnection()
{
return _appConnection;
}
@Override
public void setAsyncConnection(AsyncConnection connection)
{
_appConnection = connection;
}
}
}

View File

@ -1,5 +1,6 @@
package org.eclipse.jetty.io;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -14,6 +15,7 @@ import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -163,145 +165,206 @@ public class IOFutureTest
@Test
public void testReady() throws Exception
public void testBlockWaitsWhenNotCompleted() throws Exception
{
DispatchingIOFuture future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
final AtomicBoolean ready = new AtomicBoolean(false);
final AtomicReference<Throwable> fail = new AtomicReference<>();
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> failure = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
ready.set(true);
completed.set(true);
}
@Override
public void failed(Object context, Throwable cause)
{
fail.set(cause);
failure.set(cause);
}
}, null);
long start=System.currentTimeMillis();
assertFalse(future.block(100,TimeUnit.MILLISECONDS));
assertThat(System.currentTimeMillis()-start,greaterThan(10L));
long sleep = 1000;
long start = System.nanoTime();
assertFalse(future.block(sleep, TimeUnit.MILLISECONDS));
assertThat(System.nanoTime() - start, greaterThan(TimeUnit.MILLISECONDS.toNanos(sleep / 2)));
assertFalse(ready.get());
assertNull(fail.get());
assertFalse(completed.get());
assertNull(failure.get());
}
@Test
public void testTimedBlockWokenUpWhenCompleted() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
start=System.currentTimeMillis();
final DispatchingIOFuture f0=future;
final CountDownLatch completed = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.countDown();
}
@Override
public void failed(Object context, Throwable cause)
{
failure.set(cause);
}
}, null);
long start = System.nanoTime();
final long delay = 500;
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f0.complete();
try
{
// Want the call to block() below to happen before the call to complete() here
TimeUnit.MILLISECONDS.sleep(delay);
future.complete();
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
assertTrue(future.block(1000,TimeUnit.MILLISECONDS));
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,lessThan(1000L));
assertTrue(future.block(delay * 4, TimeUnit.MILLISECONDS));
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
assertTrue(future.isComplete());
assertTrue(ready.get());
assertNull(fail.get());
assertTrue(completed.await(delay * 4, TimeUnit.MILLISECONDS));
assertNull(failure.get());
}
ready.set(false);
@Test
public void testBlockWokenUpWhenCompleted() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
final CountDownLatch completed = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
future = new DispatchingIOFuture();
assertFalse(future.isDone());
assertFalse(future.isComplete());
start=System.currentTimeMillis();
final DispatchingIOFuture f1=future;
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.countDown();
}
@Override
public void failed(Object context, Throwable cause)
{
failure.set(cause);
}
}, null);
final long delay = 500;
long start = System.nanoTime();
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f1.complete();
try
{
// Want the call to block() below to happen before the call to complete() here
TimeUnit.MILLISECONDS.sleep(delay);
future.complete();
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
future.block();
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
assertTrue(future.isComplete());
assertFalse(ready.get()); // no callback set
assertNull(fail.get());
assertTrue(completed.await(delay * 4, TimeUnit.MILLISECONDS));
assertNull(failure.get());
}
@Test
public void testFail() throws Exception
public void testTimedBlockWokenUpOnFailure() throws Exception
{
DispatchingIOFuture future = new DispatchingIOFuture();
final Exception ex=new Exception("failed");
assertFalse(future.isDone());
assertFalse(future.isComplete());
final AtomicBoolean ready = new AtomicBoolean(false);
final AtomicReference<Throwable> fail = new AtomicReference<>();
final DispatchingIOFuture future = new DispatchingIOFuture();
final Exception ex = new Exception("failed");
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch failureLatch = new CountDownLatch(1);
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
ready.set(true);
completed.set(true);
}
@Override
public void failed(Object context, Throwable cause)
public void failed(Object context, Throwable x)
{
fail.set(cause);
failure.set(x);
failureLatch.countDown();
}
}, null);
long start=System.currentTimeMillis();
assertFalse(future.block(100,TimeUnit.MILLISECONDS));
assertThat(System.currentTimeMillis()-start,greaterThan(10L));
assertFalse(ready.get());
assertNull(fail.get());
start=System.currentTimeMillis();
final DispatchingIOFuture f0=future;
final long delay = 500;
long start = System.nanoTime();
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f0.fail(ex);
try
{
// Want the call to block() below to happen before the call to fail() here
TimeUnit.MILLISECONDS.sleep(delay);
future.fail(ex);
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
try
{
future.block(1000,TimeUnit.MILLISECONDS);
future.block(delay * 4, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException e)
catch (ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
Assert.assertSame(ex, e.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,lessThan(1000L));
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
try
@ -309,28 +372,58 @@ public class IOFutureTest
future.isComplete();
Assert.fail();
}
catch(ExecutionException e)
catch (ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
assertSame(ex, e.getCause());
}
assertFalse(ready.get());
assertEquals(ex,fail.get());
future=new DispatchingIOFuture();
ready.set(false);
fail.set(null);
assertFalse(completed.get());
assertTrue(failureLatch.await(delay * 4, TimeUnit.MILLISECONDS));
assertSame(ex, failure.get());
}
assertFalse(future.isDone());
assertFalse(future.isComplete());
start=System.currentTimeMillis();
final DispatchingIOFuture f1=future;
@Test
public void testBlockWokenUpOnFailure() throws Exception
{
final DispatchingIOFuture future = new DispatchingIOFuture();
final Exception ex = new Exception("failed");
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch failureLatch = new CountDownLatch(1);
future.setCallback(new Callback<Object>()
{
@Override
public void completed(Object context)
{
completed.set(true);
}
@Override
public void failed(Object context, Throwable x)
{
failure.set(x);
failureLatch.countDown();
}
}, null);
final long delay = 500;
long start = System.nanoTime();
new Thread()
{
@Override
public void run()
{
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
f1.fail(ex);
try
{
// Want the call to block() below to happen before the call to fail() here
TimeUnit.MILLISECONDS.sleep(delay);
future.fail(ex);
}
catch (InterruptedException x)
{
Assert.fail();
}
}
}.start();
@ -339,11 +432,14 @@ public class IOFutureTest
future.block();
Assert.fail();
}
catch(ExecutionException e)
catch (ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
Assert.assertSame(ex, e.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,greaterThan(10L));
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertThat(elapsed, greaterThan(delay / 2));
Assert.assertThat(elapsed, lessThan(delay * 2));
assertTrue(future.isDone());
try
@ -351,11 +447,13 @@ public class IOFutureTest
future.isComplete();
Assert.fail();
}
catch(ExecutionException e)
catch (ExecutionException e)
{
Assert.assertEquals(ex,e.getCause());
assertSame(ex, e.getCause());
}
assertFalse(ready.get()); // no callback set
assertNull(fail.get()); // no callback set
assertFalse(completed.get());
assertTrue(failureLatch.await(delay * 4, TimeUnit.MILLISECONDS));
assertSame(ex, failure.get());
}
}

View File

@ -48,7 +48,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
SslConnection connection = new SslConnection(engine,endpoint);
AsyncConnection delegate = super.newConnection(channel,connection.getAppEndPoint());
connection.setAppConnection(delegate);
connection.getAppEndPoint().setAsyncConnection(delegate);
return connection;
}

View File

@ -1,11 +1,5 @@
package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
@ -29,6 +23,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SelectChannelEndPointTest
{
protected volatile AsyncEndPoint _lastEndp;
@ -43,22 +43,22 @@ public class SelectChannelEndPointTest
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
protected void endPointClosed(AsyncEndPoint endpoint)
{
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
protected void endPointOpened(AsyncEndPoint endpoint)
{
}
@Override
protected void endPointUpgraded(SelectChannelEndPoint endpoint, AsyncConnection oldConnection)
protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection)
{
}
@Override
public AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment)
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
return SelectChannelEndPointTest.this.newConnection(channel,endpoint);
}
@ -138,7 +138,7 @@ public class SelectChannelEndPointTest
int filled=_endp.fill(_in);
if (filled>0)
progress=true;
// If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt)
{
@ -146,11 +146,11 @@ public class SelectChannelEndPointTest
filled=_endp.fill(_in);
progress|=filled>0;
}
// Copy to the out buffer
if (BufferUtil.hasContent(_in) && BufferUtil.append(_in,_out)>0)
progress=true;
// Blocking writes
if (BufferUtil.hasContent(_out))
{
@ -160,7 +160,7 @@ public class SelectChannelEndPointTest
_endp.write(out.asReadOnlyBuffer()).block();
progress=true;
}
// are we done?
if (_endp.isInputShutdown())
_endp.shutdownOutput();
@ -209,12 +209,7 @@ public class SelectChannelEndPointTest
*/
super.onIdleExpired(idleForMs);
}
@Override
public void onClose()
{
}
@Override
public String toString()
{
@ -223,7 +218,7 @@ public class SelectChannelEndPointTest
}
}
@Test
public void testEcho() throws Exception
{
@ -361,10 +356,10 @@ public class SelectChannelEndPointTest
clientOutputStream.flush();
while(_lastEndp==null);
_lastEndp.setMaxIdleTime(10*specifiedTimeout);
Thread.sleep(2 * specifiedTimeout);
// No echo as blocking for 10
long start=System.currentTimeMillis();
try
@ -390,7 +385,7 @@ public class SelectChannelEndPointTest
assertEquals(c,(char)b);
}
}
@Test
public void testIdle() throws Exception
{
@ -424,33 +419,33 @@ public class SelectChannelEndPointTest
long idle=System.currentTimeMillis()-start;
assertTrue(idle>400);
assertTrue(idle<2000);
// But endpoint is still open.
assertTrue(_lastEndp.isOpen());
// Wait for another idle callback
Thread.sleep(2000);
// endpoint is closed.
assertFalse(_lastEndp.isOpen());
}
@Test
public void testBlockedReadIdle() throws Exception
{
Socket client = newClient();
OutputStream clientOutputStream = client.getOutputStream();
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.register(server);
// Write client to server
clientOutputStream.write("HelloWorld".getBytes("UTF-8"));
@ -464,12 +459,12 @@ public class SelectChannelEndPointTest
// Set Max idle
_lastEndp.setMaxIdleTime(500);
// Write 8 and cause block waiting for 10
_blockAt=10;
clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush();
// read until idle shutdown received
long start=System.currentTimeMillis();
int b=client.getInputStream().read();
@ -484,14 +479,14 @@ public class SelectChannelEndPointTest
assertTrue(b>0);
assertEquals(c,(char)b);
}
// But endpoint is still open.
assertTrue(_lastEndp.isOpen());
// Wait for another idle callback
Thread.sleep(2000);
// endpoint is closed.
assertFalse(_lastEndp.isOpen());
}
@ -517,7 +512,7 @@ public class SelectChannelEndPointTest
out.write(bytes);
out.write(count);
out.flush();
while (_lastEndp==null)
Thread.sleep(10);
_lastEndp.setMaxIdleTime(5000);
@ -540,7 +535,7 @@ public class SelectChannelEndPointTest
Assert.assertThat(b,greaterThan(0));
assertEquals(0xff&b0,b);
}
count=0;
int b=in.read();
while(b>0 && b!='\n')
@ -549,7 +544,7 @@ public class SelectChannelEndPointTest
b=in.read();
}
last=System.currentTimeMillis();
latch.countDown();
}
}
@ -565,7 +560,7 @@ public class SelectChannelEndPointTest
}
}
}.start();
// Write client to server
for (int i=1;i<writes;i++)
{
@ -580,7 +575,7 @@ public class SelectChannelEndPointTest
assertTrue(latch.await(100,TimeUnit.SECONDS));
}
@Test
public void testWriteBlock() throws Exception
@ -615,7 +610,7 @@ public class SelectChannelEndPointTest
TimeUnit.MILLISECONDS.sleep(10);
}
client.close();
int i=0;

View File

@ -16,6 +16,11 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -38,14 +38,16 @@ public class Promise<T> implements Callback<T>, Future<T>
private T promise;
@Override
public void completed(T result)
public void completed(T context)
{
this.promise = result;
this.promise = context;
latch.countDown();
}
public void failed(Throwable x)
@Override
public void failed(T context, Throwable x)
{
this.promise = context;
this.failure = x;
latch.countDown();
}

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
@ -756,7 +757,7 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
}
catch (Throwable x)
{
notifyHandlerFailed(callback, x);
notifyHandlerFailed(callback, context, x);
}
}
@ -884,7 +885,7 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
}
@Override
public void failed(Throwable x)
public void failed(FrameBytes context, Throwable x)
{
throw new SPDYException(x);
}
@ -946,12 +947,12 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
}
}
private <C> void notifyHandlerFailed(Callback<C> callback, Throwable x)
private <C> void notifyHandlerFailed(Callback<C> callback, C context, Throwable x)
{
try
{
if (callback != null)
callback.failed(x);
callback.failed(context, x);
}
catch (Exception xx)
{
@ -1008,7 +1009,7 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
public void fail(Throwable x)
{
cancelTask();
notifyHandlerFailed(callback,x);
notifyHandlerFailed(callback, context, x);
}
private void cancelTask()

View File

@ -345,11 +345,11 @@ public class StandardStream implements IStream
{
if (isClosed() || isReset())
{
callback.failed(new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
callback.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo);
session.syn(pushSynInfo,null,timeout,unit, callback);
PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
session.syn(pushSynInfo, null, timeout, unit, callback);
}
@Override

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
public abstract class ControlFrameGenerator

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;

View File

@ -19,7 +19,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import java.util.EnumMap;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.CompressionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.frames.ControlFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.NoOpFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.PingFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Settings;
import org.eclipse.jetty.spdy.frames.ControlFrame;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SessionStatus;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.SPDY;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;

View File

@ -23,6 +23,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
@ -72,7 +74,7 @@ public class AsyncTimeoutTest
}
@Override
public void failed(Throwable x)
public void failed(Stream context, Throwable x)
{
failedLatch.countDown();
}
@ -120,7 +122,7 @@ public class AsyncTimeoutTest
}
@Override
public void failed(Throwable x)
public void failed(Void context, Throwable x)
{
failedLatch.countDown();
}

View File

@ -25,6 +25,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
@ -175,7 +177,7 @@ public class StandardSessionTest
}
@Override
public void failed(Throwable x)
public void failed(Stream context, Throwable x)
{
failedLatch.countDown();
}

View File

@ -101,7 +101,7 @@ public class StandardStreamTest
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Callback.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
public void failed(Stream context, Throwable x)
{
failedLatch.countDown();
}

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.StringDataInfo;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.StreamStatus;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Settings;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;

View File

@ -18,7 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -4,8 +4,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.Headers;

View File

@ -18,8 +18,8 @@ package org.eclipse.jetty.spdy;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
public interface AsyncConnectionFactory
{

View File

@ -16,50 +16,18 @@
package org.eclipse.jetty.spdy;
import java.io.IOException;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
public class EmptyAsyncConnection extends AbstractConnection implements AsyncConnection
public class EmptyAsyncConnection extends AbstractAsyncConnection
{
public EmptyAsyncConnection(AsyncEndPoint endPoint)
{
super(endPoint);
}
public Connection handle() throws IOException
{
return this;
}
@Override
public AsyncEndPoint getEndPoint()
{
return (AsyncEndPoint)super.getEndPoint();
}
@Override
public boolean isIdle()
{
return false;
}
@Override
public boolean isSuspended()
{
return false;
}
@Override
public void onClose()
{
}
@Override
public void onInputShutdown() throws IOException
public void onReadable()
{
}
}

View File

@ -17,39 +17,31 @@
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.thread.Timeout;
import org.eclipse.jetty.io.IOFuture;
public class EmptyAsyncEndPoint implements AsyncEndPoint
{
private boolean checkForIdle;
private Connection connection;
private AsyncConnection connection;
private boolean oshut;
private boolean ishut;
private boolean closed;
private int maxIdleTime;
@Override
public void dispatch()
{
}
@Override
public void asyncDispatch()
public long getCreatedTimeStamp()
{
return 0;
}
@Override
public void scheduleWrite()
{
}
@Override
public void onIdleExpired(long idleForMs)
public long getIdleTimestamp()
{
return 0;
}
@Override
@ -65,41 +57,19 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint
}
@Override
public boolean isWritable()
{
return false;
}
@Override
public boolean hasProgressed()
{
return false;
}
@Override
public void scheduleTimeout(Timeout.Task task, long timeoutMs)
{
}
@Override
public void cancelTimeout(Timeout.Task task)
{
}
@Override
public Connection getConnection()
public AsyncConnection getAsyncConnection()
{
return connection;
}
@Override
public void setConnection(Connection connection)
public void setAsyncConnection(AsyncConnection connection)
{
this.connection = connection;
}
@Override
public void shutdownOutput() throws IOException
public void shutdownOutput()
{
oshut = true;
}
@ -110,96 +80,54 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint
return oshut;
}
@Override
public void shutdownInput() throws IOException
{
ishut = true;
}
@Override
public boolean isInputShutdown()
{
return ishut;
return false;
}
@Override
public void close() throws IOException
public void close()
{
closed = true;
}
@Override
public int fill(Buffer buffer) throws IOException
public int fill(ByteBuffer buffer) throws IOException
{
return 0;
}
@Override
public int flush(Buffer buffer) throws IOException
public IOFuture readable() throws IllegalStateException
{
return null;
}
@Override
public int flush(ByteBuffer... buffer) throws IOException
{
return 0;
}
@Override
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
return 0;
}
@Override
public String getLocalAddr()
public IOFuture write(ByteBuffer... buffers) throws IllegalStateException
{
return null;
}
@Override
public String getLocalHost()
public InetSocketAddress getLocalAddress()
{
return null;
}
@Override
public int getLocalPort()
{
return -1;
}
@Override
public String getRemoteAddr()
public InetSocketAddress getRemoteAddress()
{
return null;
}
@Override
public String getRemoteHost()
{
return null;
}
@Override
public int getRemotePort()
{
return -1;
}
@Override
public boolean isBlocking()
{
return false;
}
@Override
public boolean blockReadable(long millisecs) throws IOException
{
return false;
}
@Override
public boolean blockWritable(long millisecs) throws IOException
{
return false;
}
@Override
public boolean isOpen()
{
@ -212,11 +140,6 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint
return null;
}
@Override
public void flush() throws IOException
{
}
@Override
public int getMaxIdleTime()
{

View File

@ -95,20 +95,13 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
public void close(boolean onlyOutput)
{
AsyncEndPoint endPoint = getEndPoint();
try
{
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
logger.debug("Shutting down output {}", endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
logger.debug("Closing {}", endPoint);
endPoint.close();
}
}
catch (IOException x)
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
logger.debug("Shutting down output {}", endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
logger.debug("Closing {}", endPoint);
endPoint.close();
}
}
@ -119,11 +112,6 @@ public class SPDYAsyncConnection extends AbstractAsyncConnection implements Cont
getEndPoint().setCheckForIdle(idle);
}
@Override
public void onClose()
{
}
@Override
public void onIdleExpired(long idleForMs)
{

View File

@ -42,15 +42,13 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -294,25 +292,25 @@ public class SPDYClient
SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, (int)maxIdleTime);
AsyncConnection connection = newConnection(channel, result, attachment);
result.setConnection(connection);
result.setAsyncConnection(connection);
return result;
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
protected void endPointOpened(AsyncEndPoint endpoint)
{
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection)
{
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
protected void endPointClosed(AsyncEndPoint endpoint)
{
endpoint.getConnection().onClose();
endpoint.getAsyncConnection().onClose();
}
@Override
@ -338,8 +336,8 @@ public class SPDYClient
super.onClose();
}
};
endPoint.setConnection(sslConnection);
AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
endPoint.setAsyncConnection(sslConnection);
AsyncEndPoint sslEndPoint = sslConnection.getAppEndPoint();
sslEndPointRef.set(sslEndPoint);
// Instances of the ClientProvider inner class strong reference the
@ -365,7 +363,7 @@ public class SPDYClient
ClientSPDYAsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
AsyncEndPoint sslEndPoint = sslEndPointRef.get();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachmentRef.get());
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
}
@Override
@ -378,13 +376,13 @@ public class SPDYClient
AsyncConnectionFactory connectionFactory = client.getAsyncConnectionFactory(protocol);
AsyncEndPoint sslEndPoint = sslEndPointRef.get();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachmentRef.get());
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
return protocol;
}
});
AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
startHandshake(engine);
@ -394,13 +392,13 @@ public class SPDYClient
{
AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
return connection;
}
}
catch (RuntimeException x)
{
sessionPromise.failed(x);
sessionPromise.failed(null, x);
throw x;
}
}
@ -444,7 +442,7 @@ public class SPDYClient
Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator);
parser.addListener(session);

View File

@ -33,11 +33,11 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.SslConnection;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.spdy.api.SPDY;
@ -189,8 +189,8 @@ public class SPDYServerConnector extends SelectChannelConnector
super.onClose();
}
};
endPoint.setConnection(sslConnection);
AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
endPoint.setAsyncConnection(sslConnection);
AsyncEndPoint sslEndPoint = sslConnection.getAppEndPoint();
sslEndPointRef.set(sslEndPoint);
// Instances of the ServerProvider inner class strong reference the
@ -209,7 +209,7 @@ public class SPDYServerConnector extends SelectChannelConnector
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncEndPoint sslEndPoint = sslEndPointRef.get();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, SPDYServerConnector.this);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
}
@Override
@ -224,12 +224,12 @@ public class SPDYServerConnector extends SelectChannelConnector
AsyncConnectionFactory connectionFactory = getAsyncConnectionFactory(protocol);
AsyncEndPoint sslEndPoint = sslEndPointRef.get();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, SPDYServerConnector.this);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
}
});
AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
startHandshake(engine);
@ -239,7 +239,7 @@ public class SPDYServerConnector extends SelectChannelConnector
{
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
return connection;
}
}

View File

@ -16,15 +16,13 @@
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
@ -65,7 +63,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
SPDYServerConnector connector = (SPDYServerConnector)attachment;
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
final StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator);
parser.addListener(session);
@ -85,7 +83,6 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
{
private final ServerSessionFrameListener listener;
private final SPDYServerConnector connector;
private volatile boolean connected;
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
{
@ -95,16 +92,11 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
}
@Override
public Connection handle() throws IOException
public void onOpen()
{
if (!connected)
{
// NPE guard to support tests
if (listener != null)
listener.onConnect(getSession());
connected = true;
}
return super.handle();
// NPE guard to support tests
if (listener != null)
listener.onConnect(getSession());
}
@Override