415330 Avoid multiple callbacks at EOF

This commit is contained in:
Greg Wilkins 2013-08-22 21:31:06 +10:00
parent 16191da3ac
commit c4583918a3
3 changed files with 97 additions and 48 deletions

View File

@ -461,8 +461,9 @@ public class HttpOutput extends ServletOutputStream
*/ */
private class InputStreamWritingCB extends IteratingCallback private class InputStreamWritingCB extends IteratingCallback
{ {
final InputStream _in; private final InputStream _in;
final ByteBuffer _buffer; private final ByteBuffer _buffer;
private boolean _eof;
public InputStreamWritingCB(InputStream in, Callback callback) public InputStreamWritingCB(InputStream in, Callback callback)
{ {
@ -474,12 +475,19 @@ public class HttpOutput extends ServletOutputStream
@Override @Override
protected boolean process() throws Exception protected boolean process() throws Exception
{ {
boolean eof=false; if (_eof)
{
// Handle EOF
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
int len=_in.read(_buffer.array(),0,_buffer.capacity()); int len=_in.read(_buffer.array(),0,_buffer.capacity());
if (len<0) if (len<0)
{ {
eof=true; _eof=true;
len=0; len=0;
_in.close(); _in.close();
} }
@ -488,7 +496,7 @@ public class HttpOutput extends ServletOutputStream
// read ahead for EOF to try for single commit // read ahead for EOF to try for single commit
int len2=_in.read(_buffer.array(),len,_buffer.capacity()-len); int len2=_in.read(_buffer.array(),len,_buffer.capacity()-len);
if (len2<0) if (len2<0)
eof=true; _eof=true;
else else
len+=len2; len+=len2;
} }
@ -496,16 +504,7 @@ public class HttpOutput extends ServletOutputStream
// write what we have // write what we have
_buffer.position(0); _buffer.position(0);
_buffer.limit(len); _buffer.limit(len);
_channel.write(_buffer,eof,this); _channel.write(_buffer,_eof,this);
// Handle EOF
if (eof)
{
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
return false; return false;
} }
@ -537,8 +536,9 @@ public class HttpOutput extends ServletOutputStream
*/ */
private class ReadableByteChannelWritingCB extends IteratingCallback private class ReadableByteChannelWritingCB extends IteratingCallback
{ {
final ReadableByteChannel _in; private final ReadableByteChannel _in;
final ByteBuffer _buffer; private final ByteBuffer _buffer;
private boolean _eof;
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{ {
@ -550,13 +550,19 @@ public class HttpOutput extends ServletOutputStream
@Override @Override
protected boolean process() throws Exception protected boolean process() throws Exception
{ {
if (_eof)
{
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
_buffer.clear(); _buffer.clear();
boolean eof=false;
int len=_in.read(_buffer); int len=_in.read(_buffer);
if (len<0) if (len<0)
{ {
eof=true; _eof=true;
len=0; len=0;
_in.close(); _in.close();
} }
@ -565,23 +571,14 @@ public class HttpOutput extends ServletOutputStream
// read ahead for EOF to try for single commit // read ahead for EOF to try for single commit
int len2=_in.read(_buffer); int len2=_in.read(_buffer);
if (len2<0) if (len2<0)
eof=true; _eof=true;
else else
len+=len2; len+=len2;
} }
// write what we have // write what we have
_buffer.flip(); _buffer.flip();
_channel.write(_buffer,eof,this); _channel.write(_buffer,_eof,this);
// Handle EOF
if (eof)
{
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
return false; return false;
} }

View File

@ -522,6 +522,8 @@ public class ResourceHandler extends HandlerWrapper
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
LOG.warn(x.toString());
LOG.debug(x);
async.complete(); async.complete();
} }
}; };

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* if that task completes quickly and uses the calling thread to callback * if that task completes quickly and uses the calling thread to callback
* the success notification, this can result in a growing stack depth. * the success notification, this can result in a growing stack depth.
* </p> * </p>
* <p>To avoid this issue, this callback uses an Atomicboolean to note * <p>To avoid this issue, this callback uses an AtomicReference to note
* if the success callback has been called during the processing of a * if the success callback has been called during the processing of a
* sub task, and if so then the processing iterates rather than recurses. * sub task, and if so then the processing iterates rather than recurses.
* </p> * </p>
@ -42,9 +42,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public abstract class IteratingCallback implements Callback public abstract class IteratingCallback implements Callback
{ {
final AtomicBoolean _iterating = new AtomicBoolean(); private enum State { WAITING, ITERATING, SUCCEEDED, FAILED };
final Callback _callback; private final AtomicReference<State> _state = new AtomicReference<>(State.WAITING);
private final Callback _callback;
public IteratingCallback(Callback callback) public IteratingCallback(Callback callback)
{ {
@ -73,38 +73,88 @@ public abstract class IteratingCallback implements Callback
try try
{ {
// Keep iterating as long as succeeded() is called during process() // Keep iterating as long as succeeded() is called during process()
while(_iterating.compareAndSet(false,true)) // If we are in WAITING state, either this is the first iteration or
// succeeded()/failed() were called already.
while(_state.compareAndSet(State.WAITING,State.ITERATING))
{ {
// process and test if we are complete // Make some progress by calling process()
if (process()) if (process())
{ {
// A true return indicates we are finished a no further callbacks
// are scheduled. So we must still be ITERATING.
if (_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
_callback.succeeded(); _callback.succeeded();
else
throw new IllegalStateException("Already "+_state.get());
return; return;
} }
// else a callback has been scheduled. If it has not happened yet,
// we will still be ITERATING
else if (_state.compareAndSet(State.ITERATING,State.WAITING))
// no callback yet, so break the loop and wait for it
break;
// The callback must have happened and we are either WAITING already or FAILED
// the loop test will work out which
} }
} }
catch(Exception e) catch(Exception e)
{ {
_iterating.set(false); failed(e);
_callback.failed(e);
}
finally
{
_iterating.set(false);
} }
} }
@Override @Override
public void succeeded() public void succeeded()
{ {
if (!_iterating.compareAndSet(true,false)) // Try a short cut for the fast method. If we are still iterating
if (_state.compareAndSet(State.ITERATING,State.WAITING))
// then next loop will continue processing, so nothing to do here
return;
// OK do it properly
loop: while(true)
{
switch(_state.get())
{
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.WAITING))
break loop;
continue;
case WAITING:
// we are really waiting, so use this callback thread to iterate some more
iterate(); iterate();
break loop;
default:
throw new IllegalStateException("Already "+_state.get());
}
}
} }
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
loop: while(true)
{
switch(_state.get())
{
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.FAILED))
break loop;
continue;
case WAITING:
if (_state.compareAndSet(State.WAITING,State.FAILED))
break loop;
continue;
default:
throw new IllegalStateException("Already "+_state.get());
}
}
_callback.failed(x); _callback.failed(x);
} }