diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 147eca478ff..22248b3ed07 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -461,8 +461,9 @@ public class HttpOutput extends ServletOutputStream */ private class InputStreamWritingCB extends IteratingCallback { - final InputStream _in; - final ByteBuffer _buffer; + private final InputStream _in; + private final ByteBuffer _buffer; + private boolean _eof; public InputStreamWritingCB(InputStream in, Callback callback) { @@ -474,12 +475,19 @@ public class HttpOutput extends ServletOutputStream @Override protected boolean process() throws Exception { - boolean eof=false; + if (_eof) + { + // Handle EOF + closed(); + _channel.getByteBufferPool().release(_buffer); + return true; + } + int len=_in.read(_buffer.array(),0,_buffer.capacity()); if (len<0) { - eof=true; + _eof=true; len=0; _in.close(); } @@ -488,24 +496,15 @@ public class HttpOutput extends ServletOutputStream // read ahead for EOF to try for single commit int len2=_in.read(_buffer.array(),len,_buffer.capacity()-len); if (len2<0) - eof=true; + _eof=true; else len+=len2; } - + // write what we have _buffer.position(0); _buffer.limit(len); - _channel.write(_buffer,eof,this); - - // Handle EOF - if (eof) - { - closed(); - _channel.getByteBufferPool().release(_buffer); - return true; - } - + _channel.write(_buffer,_eof,this); return false; } @@ -537,8 +536,9 @@ public class HttpOutput extends ServletOutputStream */ private class ReadableByteChannelWritingCB extends IteratingCallback { - final ReadableByteChannel _in; - final ByteBuffer _buffer; + private final ReadableByteChannel _in; + private final ByteBuffer _buffer; + private boolean _eof; public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { @@ -550,13 +550,19 @@ public class HttpOutput extends ServletOutputStream @Override protected boolean process() throws Exception { + if (_eof) + { + closed(); + _channel.getByteBufferPool().release(_buffer); + return true; + } + _buffer.clear(); - boolean eof=false; int len=_in.read(_buffer); if (len<0) { - eof=true; + _eof=true; len=0; _in.close(); } @@ -565,23 +571,14 @@ public class HttpOutput extends ServletOutputStream // read ahead for EOF to try for single commit int len2=_in.read(_buffer); if (len2<0) - eof=true; + _eof=true; else len+=len2; } // write what we have _buffer.flip(); - _channel.write(_buffer,eof,this); - - // Handle EOF - if (eof) - { - closed(); - _channel.getByteBufferPool().release(_buffer); - return true; - } - + _channel.write(_buffer,_eof,this); return false; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java index 21e4bc8371f..c2eb255dcee 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java @@ -522,6 +522,8 @@ public class ResourceHandler extends HandlerWrapper @Override public void failed(Throwable x) { + LOG.warn(x.toString()); + LOG.debug(x); async.complete(); } }; diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 0c52fa6928b..f9b2527d569 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -18,7 +18,7 @@ package org.eclipse.jetty.util; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /* ------------------------------------------------------------ */ @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * if that task completes quickly and uses the calling thread to callback * the success notification, this can result in a growing stack depth. *

- *

To avoid this issue, this callback uses an Atomicboolean to note + *

To avoid this issue, this callback uses an AtomicReference to note * if the success callback has been called during the processing of a * sub task, and if so then the processing iterates rather than recurses. *

@@ -42,9 +42,9 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public abstract class IteratingCallback implements Callback { - final AtomicBoolean _iterating = new AtomicBoolean(); - final Callback _callback; - + private enum State { WAITING, ITERATING, SUCCEEDED, FAILED }; + private final AtomicReference _state = new AtomicReference<>(State.WAITING); + private final Callback _callback; public IteratingCallback(Callback callback) { @@ -73,38 +73,88 @@ public abstract class IteratingCallback implements Callback try { // Keep iterating as long as succeeded() is called during process() - while(_iterating.compareAndSet(false,true)) + // If we are in WAITING state, either this is the first iteration or + // succeeded()/failed() were called already. + while(_state.compareAndSet(State.WAITING,State.ITERATING)) { - // process and test if we are complete + // Make some progress by calling process() if (process()) { - _callback.succeeded(); + // A true return indicates we are finished a no further callbacks + // are scheduled. So we must still be ITERATING. + if (_state.compareAndSet(State.ITERATING,State.SUCCEEDED)) + _callback.succeeded(); + else + throw new IllegalStateException("Already "+_state.get()); return; } + // else a callback has been scheduled. If it has not happened yet, + // we will still be ITERATING + else if (_state.compareAndSet(State.ITERATING,State.WAITING)) + // no callback yet, so break the loop and wait for it + break; + + // The callback must have happened and we are either WAITING already or FAILED + // the loop test will work out which } } catch(Exception e) { - _iterating.set(false); - _callback.failed(e); - } - finally - { - _iterating.set(false); + failed(e); } } - @Override public void succeeded() { - if (!_iterating.compareAndSet(true,false)) - iterate(); + // Try a short cut for the fast method. If we are still iterating + if (_state.compareAndSet(State.ITERATING,State.WAITING)) + // then next loop will continue processing, so nothing to do here + return; + + // OK do it properly + loop: while(true) + { + switch(_state.get()) + { + case ITERATING: + if (_state.compareAndSet(State.ITERATING,State.WAITING)) + break loop; + continue; + + case WAITING: + // we are really waiting, so use this callback thread to iterate some more + iterate(); + break loop; + + default: + throw new IllegalStateException("Already "+_state.get()); + } + } } @Override public void failed(Throwable x) { + loop: while(true) + { + switch(_state.get()) + { + case ITERATING: + if (_state.compareAndSet(State.ITERATING,State.FAILED)) + break loop; + continue; + + case WAITING: + if (_state.compareAndSet(State.WAITING,State.FAILED)) + break loop; + continue; + + default: + throw new IllegalStateException("Already "+_state.get()); + } + } + _callback.failed(x); }