jetty-9 AsyncByteArrayEndPoint test

This commit is contained in:
Greg Wilkins 2012-05-14 10:58:45 +02:00
parent 4b8f6b8413
commit fcfb5d704a
20 changed files with 262 additions and 262 deletions

View File

@ -9,7 +9,7 @@ public abstract class AbstractEndPoint implements EndPoint
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private volatile int _maxIdleTime;
private volatile long _idleTimestamp;
private volatile long _idleTimestamp=System.currentTimeMillis();
protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote)

View File

@ -1,6 +1,7 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Timer;
@ -14,11 +15,14 @@ import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint
{
private static final int TICK=500;
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private static final Timer _timer = new Timer();
private static final Timer _timer = new Timer(true);
private boolean _checkForIdle;
private AsyncConnection _connection;
private final TimerTask _task=new TimeoutTask(this);
private final ReadInterest _readInterest = new ReadInterest()
{
@Override
@ -38,6 +42,10 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
}
};
{
_timer.schedule(_task,TICK,TICK);
}
public AsyncByteArrayEndPoint()
{
super();
@ -99,23 +107,6 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
public void setCheckForIdle(boolean check)
{
_checkForIdle=check;
if (check)
{
final TimerTask task=new TimerTask()
{
@Override
public void run()
{
checkForIdleOrReadWriteTimeout(System.currentTimeMillis());
if (_checkForIdle)
_timer.schedule(this,1000);
}
};
_timer.schedule(task,1000);
}
}
@Override
@ -153,7 +144,13 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
notIdle();
if (_checkForIdle)
_connection.onIdleExpired(idleForMs);
{
AsyncConnection connection=_connection;
if (connection==null)
close();
else
connection.onIdleExpired(idleForMs);
}
TimeoutException timeout = new TimeoutException();
_readInterest.failed(timeout);
@ -167,8 +164,28 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
@Override
public void onClose()
{
setCheckForIdle(false);
_task.cancel();
super.onClose();
}
private static class TimeoutTask extends TimerTask
{
final WeakReference<AsyncByteArrayEndPoint> _endp;
TimeoutTask(AsyncByteArrayEndPoint endp)
{
_endp=new WeakReference<AsyncByteArrayEndPoint>(endp);
}
@Override
public void run()
{
AsyncByteArrayEndPoint endp = _endp.get();
if (endp==null)
cancel();
else
endp.checkForIdleOrReadWriteTimeout(System.currentTimeMillis());
}
};
}

View File

@ -7,6 +7,8 @@ import java.nio.channels.WritePendingException;
import java.util.concurrent.Future;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback;
import org.eclipse.jetty.util.FutureCallback;
/* ------------------------------------------------------------ */
/**Asynchronous End Point
@ -15,6 +17,57 @@ import org.eclipse.jetty.util.Callback;
* The design of these has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because: they have
* some inefficiencies.
* <p>
* This class will frequently be used in conjunction with some of the utility
* implementations of {@link Callback}, such as {@link FutureCallback} and
* {@link ExecutorCallback}. Examples are:
* <h3>Blocking Read</h3>
* A FutureCallback can be used to block until an endpoint is ready to be filled
* from:
* <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>();
* endpoint.readable("ContextObj",future);
* ...
* String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);</pre></blockquote>
* <h3>Dispatched Read</h3>
* By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <blockquote><pre>
* endpoint.readable("ContextObj",new ExecutorCallback<String>(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* });</pre></blockquote>
* The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.
*
* <h3>Blocking Write</h3>
* The write contract is that the callback complete is not called until all data has been
* written or there is a failure. For blocking this looks like:
*
* <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>();
* endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
* String context = future.get(); // This blocks
* </pre></blockquote>
*
* <h3>Dispatched Write</h3>
* Note also that multiple buffers may be passed in write so that gather writes
* can be done:
* <blockquote><pre>
* endpoint.write("ContextObj",new ExecutorCallback<String>(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* },headerBuffer,contentBuffer);</pre></blockquote>
*
*/
public interface AsyncEndPoint extends EndPoint

View File

@ -246,7 +246,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
shutdownInput();
if (_ishut)
return -1;
return BufferUtil.append(_in,buffer);
int filled=BufferUtil.append(_in,buffer);
if (filled>0)
notIdle();
return filled;
}
/* ------------------------------------------------------------ */
@ -261,7 +264,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
if (_oshut)
throw new IOException("OSHUT");
int len=0;
int flushed=0;
for (ByteBuffer b : buffers)
{
@ -278,13 +281,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint
}
}
len+=BufferUtil.append(b,_out);
flushed+=BufferUtil.append(b,_out);
if (BufferUtil.hasContent(b))
break;
}
}
return len;
if (flushed>0)
notIdle();
return flushed;
}
/* ------------------------------------------------------------ */

View File

@ -17,8 +17,8 @@ import org.eclipse.jetty.util.Callback;
public abstract class ReadInterest
{
private final AtomicBoolean _interested = new AtomicBoolean(false);
private volatile Callback _readCallback;
private Object _readContext;
private volatile Callback _callback;
private Object _context;
/* ------------------------------------------------------------ */
protected ReadInterest()
@ -30,8 +30,8 @@ public abstract class ReadInterest
{
if (!_interested.compareAndSet(false,true))
throw new ReadPendingException();
_readContext=context;
_readCallback=callback;
_context=context;
_callback=callback;
try
{
if (readIsPossible())
@ -49,10 +49,10 @@ public abstract class ReadInterest
{
if (_interested.compareAndSet(true,false))
{
Callback callback=_readCallback;
Object context=_readContext;
_readCallback=null;
_readContext=null;
Callback callback=_callback;
Object context=_context;
_callback=null;
_context=null;
callback.completed(context);
}
}
@ -68,10 +68,10 @@ public abstract class ReadInterest
{
if (_interested.compareAndSet(true,false))
{
Callback callback=_readCallback;
Object context=_readContext;
_readCallback=null;
_readContext=null;
Callback callback=_callback;
Object context=_context;
_callback=null;
_context=null;
callback.failed(context,cause);
}
}
@ -81,15 +81,22 @@ public abstract class ReadInterest
{
if (_interested.compareAndSet(true,false))
{
Callback callback=_readCallback;
Object context=_readContext;
_readCallback=null;
_readContext=null;
Callback callback=_callback;
Object context=_context;
_callback=null;
_context=null;
callback.failed(context,new ClosedChannelException());
}
}
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("ReadInterest@%x{%b,%s,%s}",hashCode(),_interested.get(),_callback,_context);
}
/* ------------------------------------------------------------ */
abstract protected boolean readIsPossible() throws IOException;
}

View File

@ -186,7 +186,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
notIdle();
if (_idlecheck)
_connection.onIdleExpired(idleForMs);
{
AsyncConnection connection=_connection;
if (connection==null)
close();
else
connection.onIdleExpired(idleForMs);
}
TimeoutException timeout = new TimeoutException();
_readInterest.failed(timeout);

View File

@ -26,9 +26,9 @@ abstract public class WriteFlusher
private final AtomicBoolean _writing = new AtomicBoolean(false);
private final EndPoint _endp;
private ByteBuffer[] _writeBuffers;
private Object _writeContext;
private Callback _writeCallback;
private ByteBuffer[] _buffers;
private Object _context;
private Callback _callback;
protected WriteFlusher(EndPoint endp)
{
@ -49,9 +49,9 @@ abstract public class WriteFlusher
{
if (b.hasRemaining())
{
_writeBuffers=buffers;
_writeContext=context;
_writeCallback=callback;
_buffers=buffers;
_context=context;
_callback=callback;
scheduleCompleteWrite();
_writing.set(true); // Needed as memory barrier
return;
@ -111,11 +111,11 @@ abstract public class WriteFlusher
try
{
_writeBuffers=compact(_writeBuffers);
_endp.flush(_writeBuffers);
_buffers=compact(_buffers);
_endp.flush(_buffers);
// Are we complete?
for (ByteBuffer b : _writeBuffers)
for (ByteBuffer b : _buffers)
{
if (b.hasRemaining())
{
@ -125,21 +125,21 @@ abstract public class WriteFlusher
}
// we are complete and ready
Callback callback=_writeCallback;
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
_writeContext=null;
Callback callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.completed(context);
}
catch (IOException e)
{
Callback callback=_writeCallback;
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
Callback callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.failed(context,e);
@ -157,10 +157,10 @@ abstract public class WriteFlusher
{
if (!_writing.compareAndSet(true,false))
return false;
Callback callback=_writeCallback;
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
Callback callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
callback.failed(context,cause);
return true;
}
@ -176,10 +176,10 @@ abstract public class WriteFlusher
{
if (!_writing.compareAndSet(true,false))
return false;
Callback callback=_writeCallback;
Object context=_writeContext;
_writeBuffers=null;
_writeCallback=null;
Callback callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
callback.failed(context,new ClosedChannelException());
return true;
}
@ -190,4 +190,9 @@ abstract public class WriteFlusher
return _writing.get();
}
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),_writing.get(),_callback,_context);
}
}

View File

@ -10,9 +10,11 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
@ -103,6 +105,76 @@ public class AsyncByteArrayEndPointTest
assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get());
assertEquals(" more.",endp.getOutputString());
}
@Test
public void testIdle() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint();
endp.setMaxIdleTime(500);
endp.setInput("test");
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(5));
// no idle check
endp.setCheckForIdle(false);
assertTrue(endp.isOpen());
Thread.sleep(1000);
assertTrue(endp.isOpen());
// normal read
ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<Void> fcb = new FutureCallback<>();
endp.readable(null,fcb);
assertTrue(fcb.isDone());
assertEquals(null,fcb.get());
assertEquals(4,endp.fill(buffer));
assertEquals("test",BufferUtil.toString(buffer));
// read timeout
fcb = new FutureCallback<>();
endp.readable(null,fcb);
long start=System.currentTimeMillis();
try
{
fcb.get();
fail();
}
catch(ExecutionException t)
{
assertThat(t.getCause(),Matchers.instanceOf(TimeoutException.class));
}
assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(100L));
assertTrue(endp.isOpen());
// write timeout
fcb = new FutureCallback<>();
start=System.currentTimeMillis();
endp.write(null,fcb,BufferUtil.toBuffer("This is too long"));
try
{
fcb.get();
fail();
}
catch(ExecutionException t)
{
assertThat(t.getCause(),Matchers.instanceOf(TimeoutException.class));
}
assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(100L));
assertTrue(endp.isOpen());
// Still no idle close
Thread.sleep(1000);
assertTrue(endp.isOpen());
// idle close
endp.setCheckForIdle(true);
Thread.sleep(1000);
assertFalse(endp.isOpen());
}
}

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class LocalConnector extends AbstractConnector
public class LocalConnector extends AbstractHttpConnector
{
private static final Logger LOG = Log.getLogger(LocalConnector.class);
private final BlockingQueue<Request> _requests = new LinkedBlockingQueue<Request>();

View File

@ -1,28 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import org.junit.BeforeClass;
/**
* HttpServer Tester.
*/
public class BlockingChannelCloseTest extends HttpServerTestBase
{
@BeforeClass
public static void init() throws Exception
{
startServer(new BlockingChannelConnector());
}
}

View File

@ -1,28 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import org.junit.BeforeClass;
/**
* HttpServer Tester.
*/
public class BlockingChannelServerTest extends HttpServerTestBase
{
@BeforeClass
public static void init() throws Exception
{
startServer(new BlockingChannelConnector());
}
}

View File

@ -1,40 +0,0 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
public class BlockingChannelTimeoutTest extends ConnectorTimeoutTest
{
private static final Logger LOG = Log.getLogger(BlockingChannelTimeoutTest.class);
@BeforeClass
public static void init() throws Exception
{
BlockingChannelConnector connector = new BlockingChannelConnector();
connector.setMaxIdleTime(MAX_IDLE_TIME); //250 msec max idle
startServer(connector);
}
@Test
public void testMaxIdleWithWait() throws Exception
{
// TODO
LOG.warn("skipped BlockingChannelTimeoutTest#testMaxIdleWithWait");
}
}

View File

@ -123,7 +123,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
{
try
{
endpoint.exchange(baseRequest.getHttpChannel().getEndPoint());
endpoint.exchange(baseRequest.getHttpChannel().getConnection().getEndPoint());
}
catch(Exception e)
{}
@ -196,7 +196,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
{
try
{
endpoint.exchange(baseRequest.getHttpChannel().getEndPoint());
endpoint.exchange(baseRequest.getHttpChannel().getConnection().getEndPoint());
}
catch(Exception e)
{}

View File

@ -93,7 +93,7 @@ public class DumpHandler extends AbstractHandler
}
baseRequest.setHandled(true);
response.setHeader(HttpHeader.CONTENT_TYPE,MimeTypes.TEXT_HTML);
response.setHeader(HttpHeader.CONTENT_TYPE.asString(),MimeTypes.Type.TEXT_HTML.asString());
OutputStream out = response.getOutputStream();
ByteArrayOutputStream buf = new ByteArrayOutputStream(2048);

View File

@ -942,7 +942,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
Socket client=newSocket(HOST,_connector.getLocalPort());
try
{
((StdErrLog)Log.getLogger(AbstractHttpConnection.class)).setHideStacks(true);
((StdErrLog)Log.getLogger(HttpConnection.class)).setHideStacks(true);
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
@ -970,7 +970,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
finally
{
((StdErrLog)Log.getLogger(AbstractHttpConnection.class)).setHideStacks(false);
((StdErrLog)Log.getLogger(HttpConnection.class)).setHideStacks(false);
if (!client.isClosed())
client.close();
@ -983,7 +983,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
_endp=baseRequest.getHttpChannel().getEndPoint();
_endp=baseRequest.getHttpChannel().getConnection().getEndPoint();
response.setHeader("test","value");
response.setStatus(200);
response.setContentType("text/plain");

View File

@ -1,30 +0,0 @@
// ========================================================================
// Copyright (c) Webtide LLC
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import org.junit.Before;
/* ------------------------------------------------------------ */
public class SocketConnectorCloseTest extends ConnectorCloseTestBase
{
/* ------------------------------------------------------------ */
@Before
public void init() throws Exception
{
System.setProperty("org.eclipse.jetty.util.log.DEBUG","true");
startServer(new SocketConnector());
}
}

View File

@ -1,27 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import org.junit.BeforeClass;
/**
* HttpServer Tester.
*/
public class SocketServerTest extends HttpServerTestBase
{
@BeforeClass
public static void init() throws Exception
{
startServer(new SocketConnector());
}
}

View File

@ -1,28 +0,0 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import org.junit.BeforeClass;
public class SocketTimeoutTest extends ConnectorTimeoutTest
{
@BeforeClass
public static void init() throws Exception
{
SocketConnector connector = new SocketConnector();
connector.setMaxIdleTime(MAX_IDLE_TIME); //250 msec max idle
startServer(connector);
}
}

View File

@ -3,7 +3,7 @@ package org.eclipse.jetty.util;
import java.util.concurrent.Executor;
public class ExecutorCallback<C> implements Callback<C>
public abstract class ExecutorCallback<C> implements Callback<C>
{
private final static Integer ZERO = new Integer(0);
private final static ThreadLocal<Integer> __calls = new ThreadLocal<Integer>()
@ -76,9 +76,7 @@ public class ExecutorCallback<C> implements Callback<C>
});
}
protected void onCompleted(C context)
{
}
protected abstract void onCompleted(C context);
@Override

View File

@ -12,13 +12,24 @@ import java.util.concurrent.atomic.AtomicReference;
public class FutureCallback<C> implements Future<C>,Callback<C>
{
// TODO investigate use of a phasor
private enum State {NOT_DONE,DOING,DONE};
private final AtomicReference<State> _state=new AtomicReference<>(State.NOT_DONE);
private final CountDownLatch _done= new CountDownLatch(1);
private CountDownLatch _done= new CountDownLatch(1);
private Throwable _cause;
private C _context;
private boolean _completed;
private void recycle()
{
// TODO make this public?
if (!isDone())
throw new IllegalStateException();
_cause=null;
_context=null;
_completed=false;
_done=new CountDownLatch(1);
}
@Override
public void completed(C context)
@ -109,4 +120,11 @@ public class FutureCallback<C> implements Future<C>,Callback<C>
throw (RuntimeException)cause;
throw new RuntimeException(cause);
}
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("FutureCallback@%x{%s,%b,%s}",hashCode(),_state,_completed,_context);
}
}