jetty-9 more renaming and signature tuning

This commit is contained in:
Greg Wilkins 2012-06-07 17:17:37 +02:00
parent 0b56e3ae7c
commit ad689a6a57
7 changed files with 45 additions and 98 deletions

View File

@ -25,7 +25,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
private final ReadInterest _readInterest = new ReadInterest() private final ReadInterest _readInterest = new ReadInterest()
{ {
@Override @Override
protected boolean registerReadInterest() throws IOException protected boolean needsFill() throws IOException
{ {
if (_closed) if (_closed)
throw new ClosedChannelException(); throw new ClosedChannelException();
@ -36,9 +36,9 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
protected boolean registerFlushInterest() protected void onIncompleteFlushed()
{ {
return false; // Don't need to do anything here as takeOutput does the signalling.
} }
}; };

View File

@ -17,7 +17,7 @@ import org.eclipse.jetty.util.Callback;
public abstract class ReadInterest public abstract class ReadInterest
{ {
private final AtomicBoolean _interested = new AtomicBoolean(false); private final AtomicBoolean _interested = new AtomicBoolean(false);
private volatile Callback _callback; private volatile Callback<Object> _callback;
private Object _context; private Object _context;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -27,7 +27,7 @@ public abstract class ReadInterest
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Call to register interest in a callback when a read is possible. /** Call to register interest in a callback when a read is possible.
* The callback will be called either immediately if {@link #registerReadInterest()} * The callback will be called either immediately if {@link #needsFill()}
* returns true or eventually once {@link #readable()} is called. * returns true or eventually once {@link #readable()} is called.
* @param context * @param context
* @param callback * @param callback
@ -38,10 +38,10 @@ public abstract class ReadInterest
if (!_interested.compareAndSet(false,true)) if (!_interested.compareAndSet(false,true))
throw new ReadPendingException(); throw new ReadPendingException();
_context=context; _context=context;
_callback=callback; _callback=(Callback<Object>)callback;
try try
{ {
if (registerReadInterest()) if (needsFill())
readable(); readable();
} }
catch(IOException e) catch(IOException e)
@ -57,7 +57,7 @@ public abstract class ReadInterest
{ {
if (_interested.compareAndSet(true,false)) if (_interested.compareAndSet(true,false))
{ {
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_callback=null; _callback=null;
_context=null; _context=null;
@ -81,7 +81,7 @@ public abstract class ReadInterest
{ {
if (_interested.compareAndSet(true,false)) if (_interested.compareAndSet(true,false))
{ {
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_callback=null; _callback=null;
_context=null; _context=null;
@ -94,7 +94,7 @@ public abstract class ReadInterest
{ {
if (_interested.compareAndSet(true,false)) if (_interested.compareAndSet(true,false))
{ {
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_callback=null; _callback=null;
_context=null; _context=null;
@ -103,6 +103,7 @@ public abstract class ReadInterest
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public String toString() public String toString()
{ {
return String.format("ReadInterest@%x{%b,%s,%s}",hashCode(),_interested.get(),_callback,_context); return String.format("ReadInterest@%x{%b,%s,%s}",hashCode(),_interested.get(),_callback,_context);
@ -116,7 +117,7 @@ public abstract class ReadInterest
* @return true if a read is possible * @return true if a read is possible
* @throws IOException * @throws IOException
*/ */
abstract protected boolean registerReadInterest() throws IOException; abstract protected boolean needsFill() throws IOException;
} }

View File

@ -53,7 +53,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
private final ReadInterest _readInterest = new ReadInterest() private final ReadInterest _readInterest = new ReadInterest()
{ {
@Override @Override
protected boolean registerReadInterest() protected boolean needsFill()
{ {
_interestOps=_interestOps | SelectionKey.OP_READ; _interestOps=_interestOps | SelectionKey.OP_READ;
updateKey(); updateKey();
@ -65,11 +65,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
protected boolean registerFlushInterest() protected void onIncompleteFlushed()
{ {
_interestOps = _interestOps | SelectionKey.OP_WRITE; _interestOps = _interestOps | SelectionKey.OP_WRITE;
updateKey(); updateKey();
return false;
} }
}; };

View File

@ -1,43 +0,0 @@
// ========================================================================
// Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
/* ------------------------------------------------------------ */
/**
* Subclass of {@link java.lang.RuntimeException} used to signal that there
* was an {@link java.io.IOException} thrown by underlying {@link UncheckedPrintWriter}
*/
public class UncheckedIOException extends RuntimeException
{
public UncheckedIOException()
{
super();
}
public UncheckedIOException(String message)
{
super(message);
}
public UncheckedIOException(Throwable cause)
{
super(cause);
}
public UncheckedIOException(String message, Throwable cause)
{
super(message,cause);
}
}

View File

@ -15,7 +15,7 @@ import org.eclipse.jetty.util.Callback;
/** /**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #registerFlushInterest()} is called when not all content has been * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()} * written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress. * method to be called when a subsequent call to flush should be able to make more progress.
* *
@ -28,7 +28,7 @@ abstract public class WriteFlusher
private ByteBuffer[] _buffers; private ByteBuffer[] _buffers;
private Object _context; private Object _context;
private Callback _callback; private Callback<Object> _callback;
protected WriteFlusher(EndPoint endp) protected WriteFlusher(EndPoint endp)
{ {
@ -44,9 +44,6 @@ abstract public class WriteFlusher
throw new WritePendingException(); throw new WritePendingException();
try try
{ {
_buffers=buffers;
_context=context;
_callback=callback;
_endp.flush(buffers); _endp.flush(buffers);
@ -55,28 +52,21 @@ abstract public class WriteFlusher
{ {
if (b.hasRemaining()) if (b.hasRemaining())
{ {
if(registerFlushInterest()) _buffers=buffers;
completeWrite(); _context=context;
else _callback=(Callback<Object>)callback;
_writing.set(true); // Needed as memory barrier _writing.set(true); // Needed as memory barrier
onIncompleteFlushed();
return; return;
} }
} }
_buffers=null;
_context=null;
_callback=null;
if (!_writing.compareAndSet(true,false)) if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException(); throw new ConcurrentModificationException();
callback.completed(context); callback.completed(context);
} }
catch (IOException e) catch (IOException e)
{ {
_buffers=null;
_context=null;
_callback=null;
if (!_writing.compareAndSet(true,false)) if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException(e); throw new ConcurrentModificationException(e);
callback.failed(context,e); callback.failed(context,e);
@ -85,12 +75,12 @@ abstract public class WriteFlusher
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Abstract call to be implemented by specific WriteFlushers. Will return true if a * Abstract call to be implemented by specific WriteFlushers.
* flush is immediately possible, otherwise it will schedule a call to {@link #completeWrite()} or * It should schedule a call to {@link #completeWrite()} or
* {@link #failed(Throwable)} when appropriate. * {@link #failed(Throwable)} when appropriate.
* @return true if a flush can proceed. * @return true if a flush can proceed.
*/ */
abstract protected boolean registerFlushInterest(); abstract protected void onIncompleteFlushed();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -116,19 +106,18 @@ abstract public class WriteFlusher
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Complete a write that has not completed and that called * Complete a write that has not completed and that called
* {@link #registerFlushInterest()} to request a call to this * {@link #onIncompleteFlushed()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} * method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress. * is likely to be able to progress.
* @return true if a write was in progress
*/ */
public boolean completeWrite() public void completeWrite()
{ {
if (!isWriting()) if (!isWriting())
return false; return; // TODO throw?
try try
{ {
retry: while(true) while(true)
{ {
_buffers=compact(_buffers); _buffers=compact(_buffers);
_endp.flush(_buffers); _endp.flush(_buffers);
@ -138,15 +127,14 @@ abstract public class WriteFlusher
{ {
if (b.hasRemaining()) if (b.hasRemaining())
{ {
if (registerFlushInterest()) onIncompleteFlushed();
continue retry; return;
return true;
} }
} }
break; break;
} }
// we are complete and ready // we are complete and ready
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_buffers=null; _buffers=null;
_callback=null; _callback=null;
@ -157,7 +145,7 @@ abstract public class WriteFlusher
} }
catch (IOException e) catch (IOException e)
{ {
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_buffers=null; _buffers=null;
_callback=null; _callback=null;
@ -166,7 +154,7 @@ abstract public class WriteFlusher
throw new ConcurrentModificationException(); throw new ConcurrentModificationException();
callback.failed(context,e); callback.failed(context,e);
} }
return true; return;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -179,7 +167,7 @@ abstract public class WriteFlusher
{ {
if (!_writing.compareAndSet(true,false)) if (!_writing.compareAndSet(true,false))
return false; return false;
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_buffers=null; _buffers=null;
_callback=null; _callback=null;
@ -199,7 +187,7 @@ abstract public class WriteFlusher
{ {
if (!_writing.compareAndSet(true,false)) if (!_writing.compareAndSet(true,false))
return false; return false;
Callback callback=_callback; Callback<Object> callback=_callback;
Object context=_context; Object context=_context;
_buffers=null; _buffers=null;
_callback=null; _callback=null;
@ -215,6 +203,7 @@ abstract public class WriteFlusher
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override
public String toString() public String toString()
{ {
return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context); return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context);

View File

@ -228,7 +228,7 @@ public class SslConnection extends AbstractAsyncConnection
private final ReadInterest _readInterest = new ReadInterest() private final ReadInterest _readInterest = new ReadInterest()
{ {
@Override @Override
protected boolean registerReadInterest() throws IOException protected boolean needsFill() throws IOException
{ {
synchronized (SslEndPoint.this) synchronized (SslEndPoint.this)
{ {
@ -271,7 +271,7 @@ public class SslConnection extends AbstractAsyncConnection
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
protected boolean registerFlushInterest() protected void onIncompleteFlushed()
{ {
synchronized (SslEndPoint.this) synchronized (SslEndPoint.this)
{ {
@ -282,15 +282,13 @@ public class SslConnection extends AbstractAsyncConnection
_netWriting=true; _netWriting=true;
getEndPoint().write(null,_writeCallback,_netOut); getEndPoint().write(null,_writeCallback,_netOut);
} }
// TODO test this with _flushInwrap
else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP ) else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP )
// we are actually read blocked in order to write // we are actually read blocked in order to write
SslConnection.this.fillInterested(); SslConnection.this.fillInterested();
else else
{
// try the flush again // try the flush again
return true; completeWrite();
}
return false;
} }
} }
}; };
@ -495,7 +493,9 @@ public class SslConnection extends AbstractAsyncConnection
if (BufferUtil.hasContent(_netOut)) if (BufferUtil.hasContent(_netOut))
{ {
_netWriting=true; _netWriting=true;
getEndPoint().write(null,_writeCallback,_netOut); getEndPoint().flush(_netOut);
if (BufferUtil.hasContent(_netOut))
return 0;
} }
if (_fillWrap) if (_fillWrap)
return 0; return 0;

View File

@ -278,6 +278,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
byte[] bytes=REQUEST2.getBytes(); byte[] bytes=REQUEST2.getBytes();
final int pointCount=2; final int pointCount=2;
// TODO random unit tests suck!
Random random=new Random(System.currentTimeMillis()); Random random=new Random(System.currentTimeMillis());
for (int i=0; i<LOOPS; i++) for (int i=0; i<LOOPS; i++)
{ {
@ -291,7 +292,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
{ {
points[j]=random.nextInt(bytes.length); points[j]=random.nextInt(bytes.length);
} }
System.err.println("points "+points[0]+" "+points[1]); // System.err.println("points "+points[0]+" "+points[1]);
// Sort the list // Sort the list
Arrays.sort(points); Arrays.sort(points);