Fixed handling of callbacks in case of nested EndPoints.

In the SSL case, the app is writing, and eventually a SCEP.write() is triggered.
If this call throws, the SCEP callback is failed, and this in turn should fail the
app callback, but at this point the app callback was not set yet in the app
flusher, causing a NPE.
This commit is contained in:
Simone Bordet 2012-06-07 14:02:29 +02:00
parent ec4b987b4d
commit 2d6a45bf81
1 changed files with 38 additions and 30 deletions

View File

@ -12,24 +12,24 @@ import org.eclipse.jetty.util.Callback;
/* ------------------------------------------------------------ */
/**
/**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #registerFlushInterest()} is called when not all content has been
* The abstract method {@link #registerFlushInterest()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress.
*
*
*/
abstract public class WriteFlusher
{
private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0];
private final AtomicBoolean _writing = new AtomicBoolean(false);
private final EndPoint _endp;
private ByteBuffer[] _buffers;
private Object _context;
private Callback _callback;
protected WriteFlusher(EndPoint endp)
{
_endp=endp;
@ -44,6 +44,10 @@ abstract public class WriteFlusher
throw new WritePendingException();
try
{
_buffers=buffers;
_context=context;
_callback=callback;
_endp.flush(buffers);
// Are we complete?
@ -51,41 +55,44 @@ abstract public class WriteFlusher
{
if (b.hasRemaining())
{
_buffers=buffers;
_context=context;
_callback=callback;
if(registerFlushInterest())
completeWrite();
else
_writing.set(true); // Needed as memory barrier
return;
}
}
_buffers=null;
_context=null;
_callback=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.completed(context);
}
catch (IOException e)
{
_buffers=null;
_context=null;
_callback=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException(e);
callback.failed(context,e);
return;
}
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.completed(context);
}
/* ------------------------------------------------------------ */
/**
* Abstract call to be implemented by specific WriteFlushers. Will return true if a
* Abstract call to be implemented by specific WriteFlushers. Will return true if a
* flush is immediately possible, otherwise it will schedule a call to {@link #completeWrite()} or
* {@link #failed(Throwable)} when appropriate.
* @return true if a flush can proceed.
*/
abstract protected boolean registerFlushInterest();
/* ------------------------------------------------------------ */
/* Remove empty buffers from the start of a multi buffer array
*/
@ -100,25 +107,23 @@ abstract public class WriteFlusher
return buffers;
if (b==buffers.length)
return NO_BUFFERS;
ByteBuffer[] compact=new ByteBuffer[buffers.length-b];
for (int i=0;i<compact.length;i++)
compact[i]=buffers[b+i];
System.arraycopy(buffers,b,compact,0,compact.length);
return compact;
}
/* ------------------------------------------------------------ */
/**
* Complete a write that has not completed and that called
* Complete a write that has not completed and that called
* {@link #registerFlushInterest()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress.
* @return true if a write was in progress
*/
public boolean completeWrite()
{
if (!_writing.get())
if (!isWriting())
return false;
try
@ -156,6 +161,7 @@ abstract public class WriteFlusher
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.failed(context,e);
@ -164,7 +170,7 @@ abstract public class WriteFlusher
}
/* ------------------------------------------------------------ */
/**
/**
* Fail the write in progress and cause any calls to get to throw
* the cause wrapped as an execution exception.
* @return true if a write was in progress
@ -177,6 +183,7 @@ abstract public class WriteFlusher
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
callback.failed(context,cause);
return true;
}
@ -184,7 +191,7 @@ abstract public class WriteFlusher
/* ------------------------------------------------------------ */
/**
* Fail the write with a {@link ClosedChannelException}. This is similar
* to a call to {@link #failed(Throwable)}, except that the exception is
* to a call to {@link #failed(Throwable)}, except that the exception is
* not instantiated unless a write was in progress.
* @return true if a write was in progress
*/
@ -196,10 +203,11 @@ abstract public class WriteFlusher
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
callback.failed(context,new ClosedChannelException());
return true;
}
/* ------------------------------------------------------------ */
public boolean isWriting()
{
@ -209,6 +217,6 @@ abstract public class WriteFlusher
/* ------------------------------------------------------------ */
public String toString()
{
return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),_writing.get(),_callback,_context);
return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context);
}
}
}