Merge remote-tracking branch 'origin/master' into jetty-9.1
Conflicts: jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
This commit is contained in:
commit
59540abeda
|
@ -737,6 +737,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
|||
@Override
|
||||
public void failed(Throwable failure)
|
||||
{
|
||||
super.failed(failure);
|
||||
anyToFailure(failure);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -800,6 +800,7 @@ write completed - - - ASYNC READY->owp
|
|||
@Override
|
||||
public void failed(Throwable e)
|
||||
{
|
||||
super.failed(e);
|
||||
_onError=e;
|
||||
_channel.getState().onWritePossible();
|
||||
}
|
||||
|
@ -818,8 +819,9 @@ write completed - - - ASYNC READY->owp
|
|||
*/
|
||||
private class InputStreamWritingCB extends IteratingNestedCallback
|
||||
{
|
||||
final InputStream _in;
|
||||
final ByteBuffer _buffer;
|
||||
private final InputStream _in;
|
||||
private final ByteBuffer _buffer;
|
||||
private boolean _eof;
|
||||
|
||||
public InputStreamWritingCB(InputStream in, Callback callback)
|
||||
{
|
||||
|
@ -831,12 +833,19 @@ write completed - - - ASYNC READY->owp
|
|||
@Override
|
||||
protected boolean process() throws Exception
|
||||
{
|
||||
boolean eof=false;
|
||||
if (_eof)
|
||||
{
|
||||
// Handle EOF
|
||||
closed();
|
||||
_channel.getByteBufferPool().release(_buffer);
|
||||
return true;
|
||||
}
|
||||
|
||||
int len=_in.read(_buffer.array(),0,_buffer.capacity());
|
||||
|
||||
if (len<0)
|
||||
{
|
||||
eof=true;
|
||||
_eof=true;
|
||||
len=0;
|
||||
_in.close();
|
||||
}
|
||||
|
@ -845,7 +854,7 @@ write completed - - - ASYNC READY->owp
|
|||
// read ahead for EOF to try for single commit
|
||||
int len2=_in.read(_buffer.array(),len,_buffer.capacity()-len);
|
||||
if (len2<0)
|
||||
eof=true;
|
||||
_eof=true;
|
||||
else
|
||||
len+=len2;
|
||||
}
|
||||
|
@ -853,15 +862,7 @@ write completed - - - ASYNC READY->owp
|
|||
// write what we have
|
||||
_buffer.position(0);
|
||||
_buffer.limit(len);
|
||||
_channel.write(_buffer,eof,this);
|
||||
|
||||
// Handle EOF
|
||||
if (eof)
|
||||
{
|
||||
closed();
|
||||
_channel.getByteBufferPool().release(_buffer);
|
||||
return true;
|
||||
}
|
||||
_channel.write(_buffer,_eof,this);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -894,8 +895,9 @@ write completed - - - ASYNC READY->owp
|
|||
*/
|
||||
private class ReadableByteChannelWritingCB extends IteratingNestedCallback
|
||||
{
|
||||
final ReadableByteChannel _in;
|
||||
final ByteBuffer _buffer;
|
||||
private final ReadableByteChannel _in;
|
||||
private final ByteBuffer _buffer;
|
||||
private boolean _eof;
|
||||
|
||||
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
|
||||
{
|
||||
|
@ -907,13 +909,19 @@ write completed - - - ASYNC READY->owp
|
|||
@Override
|
||||
protected boolean process() throws Exception
|
||||
{
|
||||
if (_eof)
|
||||
{
|
||||
closed();
|
||||
_channel.getByteBufferPool().release(_buffer);
|
||||
return true;
|
||||
}
|
||||
|
||||
_buffer.clear();
|
||||
boolean eof=false;
|
||||
int len=_in.read(_buffer);
|
||||
|
||||
if (len<0)
|
||||
{
|
||||
eof=true;
|
||||
_eof=true;
|
||||
len=0;
|
||||
_in.close();
|
||||
}
|
||||
|
@ -922,22 +930,14 @@ write completed - - - ASYNC READY->owp
|
|||
// read ahead for EOF to try for single commit
|
||||
int len2=_in.read(_buffer);
|
||||
if (len2<0)
|
||||
eof=true;
|
||||
_eof=true;
|
||||
else
|
||||
len+=len2;
|
||||
}
|
||||
|
||||
// write what we have
|
||||
_buffer.flip();
|
||||
_channel.write(_buffer,eof,this);
|
||||
|
||||
// Handle EOF
|
||||
if (eof)
|
||||
{
|
||||
closed();
|
||||
_channel.getByteBufferPool().release(_buffer);
|
||||
return true;
|
||||
}
|
||||
_channel.write(_buffer,_eof,this);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -522,6 +522,8 @@ public class ResourceHandler extends HandlerWrapper
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
LOG.warn(x.toString());
|
||||
LOG.debug(x);
|
||||
async.complete();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
* if that task completes quickly and uses the calling thread to callback
|
||||
* the success notification, this can result in a growing stack depth.
|
||||
* </p>
|
||||
* <p>To avoid this issue, this callback uses an AtomicBoolean to note
|
||||
* <p>To avoid this issue, this callback uses an AtomicReference to note
|
||||
* if the success callback has been called during the processing of a
|
||||
* sub task, and if so then the processing iterates rather than recurses.
|
||||
* </p>
|
||||
|
@ -41,7 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
*/
|
||||
public abstract class IteratingCallback implements Callback
|
||||
{
|
||||
private final AtomicBoolean _iterating = new AtomicBoolean();
|
||||
private enum State { WAITING, ITERATING, SUCCEEDED, FAILED };
|
||||
private final AtomicReference<State> _state = new AtomicReference<>(State.WAITING);
|
||||
|
||||
public IteratingCallback()
|
||||
{
|
||||
|
@ -71,32 +72,93 @@ public abstract class IteratingCallback implements Callback
|
|||
try
|
||||
{
|
||||
// Keep iterating as long as succeeded() is called during process()
|
||||
while(_iterating.compareAndSet(false,true))
|
||||
// If we are in WAITING state, either this is the first iteration or
|
||||
// succeeded()/failed() were called already.
|
||||
while(_state.compareAndSet(State.WAITING,State.ITERATING))
|
||||
{
|
||||
// process and test if we are complete
|
||||
// Make some progress by calling process()
|
||||
if (process())
|
||||
{
|
||||
completed();
|
||||
// A true return indicates we are finished a no further callbacks
|
||||
// are scheduled. So we must still be ITERATING.
|
||||
if (_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
|
||||
completed();
|
||||
else
|
||||
throw new IllegalStateException("Already "+_state.get());
|
||||
return;
|
||||
}
|
||||
// else a callback has been scheduled. If it has not happened yet,
|
||||
// we will still be ITERATING
|
||||
else if (_state.compareAndSet(State.ITERATING,State.WAITING))
|
||||
// no callback yet, so break the loop and wait for it
|
||||
break;
|
||||
|
||||
// The callback must have happened and we are either WAITING already or FAILED
|
||||
// the loop test will work out which
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
_iterating.set(false);
|
||||
failed(e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_iterating.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
if (!_iterating.compareAndSet(true,false))
|
||||
iterate();
|
||||
// Try a short cut for the fast method. If we are still iterating
|
||||
if (_state.compareAndSet(State.ITERATING,State.WAITING))
|
||||
// then next loop will continue processing, so nothing to do here
|
||||
return;
|
||||
|
||||
// OK do it properly
|
||||
loop: while(true)
|
||||
{
|
||||
switch(_state.get())
|
||||
{
|
||||
case ITERATING:
|
||||
if (_state.compareAndSet(State.ITERATING,State.WAITING))
|
||||
break loop;
|
||||
continue;
|
||||
|
||||
case WAITING:
|
||||
// we are really waiting, so use this callback thread to iterate some more
|
||||
iterate();
|
||||
break loop;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException("Already "+_state.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Derivations of this method should always call super.failed(x)
|
||||
* to check the state before handling the failure.
|
||||
* @see org.eclipse.jetty.util.Callback#failed(java.lang.Throwable)
|
||||
*/
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
loop: while(true)
|
||||
{
|
||||
switch(_state.get())
|
||||
{
|
||||
case ITERATING:
|
||||
if (_state.compareAndSet(State.ITERATING,State.FAILED))
|
||||
break loop;
|
||||
continue;
|
||||
|
||||
case WAITING:
|
||||
if (_state.compareAndSet(State.WAITING,State.FAILED))
|
||||
break loop;
|
||||
continue;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException("Already "+_state.get(),x);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ public abstract class IteratingNestedCallback extends IteratingCallback
|
|||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
super.failed(x);
|
||||
_callback.failed(x);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue