From 714148335639d2592b9376dec722d3c7e06887c9 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Sat, 14 Dec 2013 09:14:12 +1100 Subject: [PATCH] 424043 - IteratingCallback Idle race --- .../org/eclipse/jetty/client/HttpSender.java | 14 +-- .../eclipse/jetty/server/HttpConnection.java | 12 +-- .../org/eclipse/jetty/server/HttpOutput.java | 34 +++--- .../jetty/servlets/gzip/GzipHttpOutput.java | 16 +-- .../java/org/eclipse/jetty/spdy/Flusher.java | 6 +- .../eclipse/jetty/util/IteratingCallback.java | 100 ++++++++++++------ .../jetty/util/IteratingCallbackTest.java | 44 ++++---- .../websocket/common/io/FrameFlusher.java | 6 +- 8 files changed, 133 insertions(+), 99 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 311c9b1068b..fc3b900418f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -674,11 +674,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener private class ContentCallback extends IteratingCallback { @Override - protected State process() throws Exception + protected Next process() throws Exception { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return State.IDLE; + return Next.IDLE; Request request = exchange.getRequest(); HttpContent content = HttpSender.this.content; @@ -687,21 +687,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (contentBuffer != null) { if (!someToContent(request, contentBuffer)) - return State.IDLE; + return Next.IDLE; } if (content.advance()) { // There is more content to send sendContent(exchange, content, this); - return State.SCHEDULED; + return Next.SCHEDULED; } if (content.isConsumed()) { sendContent(exchange, content, lastCallback); - return State.SCHEDULED; + return Next.SCHEDULED; } while (true) @@ -714,7 +714,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (updateSenderState(current, SenderState.IDLE)) { LOG.debug("Waiting for deferred content for {}", request); - return State.IDLE; + return Next.IDLE; } break; } @@ -725,7 +725,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener LOG.debug("Deferred content available for {}", request); // TODO: this case is not covered by tests sendContent(exchange, content, this); - return State.SCHEDULED; + return Next.SCHEDULED; } break; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 4c41803fdce..c3f42f479e9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -489,7 +489,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } @Override - public State process() throws Exception + public Next process() throws Exception { ByteBuffer chunk = _chunk; while (true) @@ -569,7 +569,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } else continue; - return State.SCHEDULED; + return Next.SCHEDULED; } case SHUTDOWN_OUT: { @@ -584,7 +584,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array()) _bufferPool.release(_header); } - return State.SUCCEEDED; + return Next.SUCCEEDED; } case CONTINUE: { @@ -641,7 +641,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } @Override - public State process() throws Exception + public Next process() throws Exception { ByteBuffer chunk = _chunk; while (true) @@ -686,7 +686,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } else continue; - return State.SCHEDULED; + return Next.SCHEDULED; } case SHUTDOWN_OUT: { @@ -695,7 +695,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } case DONE: { - return State.SUCCEEDED; + return Next.SUCCEEDED; } case CONTINUE: { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 9d2e6faeb8d..44c5cd206d8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -758,23 +758,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable } @Override - protected State process() + protected Next process() { if (BufferUtil.hasContent(_aggregate)) { _flushed=true; write(_aggregate, false, this); - return State.SCHEDULED; + return Next.SCHEDULED; } if (!_flushed) { _flushed=true; write(BufferUtil.EMPTY_BUFFER,false,this); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } } @@ -807,21 +807,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable } @Override - protected State process() + protected Next process() { // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { _completed=_len==0; write(_aggregate, _complete && _completed, this); - return State.SCHEDULED; + return Next.SCHEDULED; } // Can we just aggregate the remainder? if (!_complete && _len stalled = new HashSet<>(); @Override - protected State process() throws Exception + protected Next process() throws Exception { synchronized (lock) { @@ -194,7 +194,7 @@ public class Flusher } if (active.size() == 0) - return State.IDLE; + return Next.IDLE; // Get the bytes to write ByteBuffer[] buffers = new ByteBuffer[active.size()]; @@ -213,7 +213,7 @@ public class Flusher // MAX_GATHER parameter, and/or autotune the buffer returned // by FrameBytes.getByteBuffer() (see also comment there). - return State.SCHEDULED; + return Next.SCHEDULED; } @Override diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index b89e4b2f4f6..591257b095f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -44,7 +44,8 @@ import java.util.concurrent.atomic.AtomicReference; */ public abstract class IteratingCallback implements Callback { - protected enum State { IDLE, SCHEDULED, ITERATING, SUCCEEDED, FAILED }; + protected enum Next { IDLE, SCHEDULED, SUCCEEDED, FAILED }; + protected enum State { IDLE, SCHEDULED, ITERATING, ITERATE_AGAIN, SUCCEEDED, FAILED }; private final AtomicReference _state = new AtomicReference<>(State.IDLE); public IteratingCallback() @@ -66,7 +67,7 @@ public abstract class IteratingCallback implements Callback * * @throws Exception */ - abstract protected State process() throws Exception; + abstract protected Next process() throws Exception; /* ------------------------------------------------------------ */ @@ -81,42 +82,65 @@ public abstract class IteratingCallback implements Callback { try { - // Keep iterating as long as succeeded() is called during process() - // If we are in WAITING state, either this is the first iteration or - // succeeded()/failed() were called already. - while(_state.compareAndSet(State.IDLE,State.ITERATING)) + while(true) { - State next = process(); - switch (next) + switch (_state.get()) { - case SUCCEEDED: - // The task has complete, there should have been no callbacks - if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)) - throw new IllegalStateException("state="+_state.get()); - completed(); - return; - - case SCHEDULED: - // This callback has been scheduled, so it may or may not have - // already been called back. Let's find out - if (_state.compareAndSet(State.ITERATING,State.SCHEDULED)) - // not called back yet, so lets wait for it - return; - // call back must have happened, so lets iterate - continue; - case IDLE: - // No more progress can be made. Wait for another call to iterate - if (!_state.compareAndSet(State.ITERATING,State.IDLE)) - throw new IllegalStateException("state="+_state.get()); - return; - - case FAILED: - _state.set(State.FAILED); - return; + // Keep iterating as long as succeeded() is called during process() + // If we are in WAITING state, either this is the first iteration or + // succeeded()/failed() were called already. + while(_state.compareAndSet(State.IDLE,State.ITERATING)) + { + Next next = process(); + switch (next) + { + case SUCCEEDED: + // The task has complete, there should have been no callbacks + // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE + if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)&& + !_state.compareAndSet(State.ITERATE_AGAIN,State.SUCCEEDED)) + throw new IllegalStateException("state="+_state.get()); + completed(); + return; + + case SCHEDULED: + // This callback has been scheduled, so it may or may not have + // already been called back. Let's find out + // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE + if (_state.compareAndSet(State.ITERATING,State.SCHEDULED) || + _state.compareAndSet(State.ITERATE_AGAIN,State.SCHEDULED)) + // not called back yet, so lets wait for it + return; + // call back must have happened, so lets iterate + continue; + + case IDLE: + // No more progress can be made by this call to iterate + if (_state.compareAndSet(State.ITERATING,State.IDLE)) + return; + // was iterate called again since we already decided to go IDLE? + if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE)) + continue; // Try another iteration as more work may have been added while previous process was returning + throw new IllegalStateException("state="+_state.get()); + + case FAILED: + _state.set(State.FAILED); + return; + + default: + throw new IllegalStateException("state="+_state.get()+" next="+next); + } + } + break; + + case ITERATING: + if (_state.compareAndSet(State.ITERATING,State.ITERATE_AGAIN)) + return; + break; default: - throw new IllegalStateException("state="+_state.get()+" next="+next); + return; } } } @@ -140,6 +164,11 @@ public abstract class IteratingCallback implements Callback { switch(_state.get()) { + case ITERATE_AGAIN: + if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE)) + break loop; + continue; + case ITERATING: if (_state.compareAndSet(State.ITERATING,State.IDLE)) break loop; @@ -174,6 +203,11 @@ public abstract class IteratingCallback implements Callback { switch(_state.get()) { + case ITERATE_AGAIN: + if (_state.compareAndSet(State.ITERATE_AGAIN,State.FAILED)) + break loop; + continue; + case ITERATING: if (_state.compareAndSet(State.ITERATING,State.FAILED)) break loop; diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java index 1896d8a0d9e..fe3319e8186 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java @@ -54,15 +54,15 @@ public class IteratingCallbackTest int i=10; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { succeeded(); // fake a completed IO operation - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -81,15 +81,15 @@ public class IteratingCallbackTest int i=4; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -108,15 +108,15 @@ public class IteratingCallbackTest int i=4; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -145,7 +145,7 @@ public class IteratingCallbackTest int i=10; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) @@ -154,9 +154,9 @@ public class IteratingCallbackTest succeeded(); // fake a completed IO operation else failed(new Exception("testing")); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -173,15 +173,15 @@ public class IteratingCallbackTest int i=4; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -202,7 +202,7 @@ public class IteratingCallbackTest int i=5; @Override - protected State process() + protected Next process() { processed++; @@ -210,11 +210,11 @@ public class IteratingCallbackTest { case 5: succeeded(); - return State.SCHEDULED; + return Next.SCHEDULED; case 4: scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; case 3: scheduler.schedule(new Runnable() @@ -225,18 +225,18 @@ public class IteratingCallbackTest idle.countDown(); } },5,TimeUnit.MILLISECONDS); - return State.IDLE; + return Next.IDLE; case 2: succeeded(); - return State.SCHEDULED; + return Next.SCHEDULED; case 1: scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; case 0: - return State.SUCCEEDED; + return Next.SUCCEEDED; default: throw new IllegalStateException(); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index f8609da2b04..3f40792cc8c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -217,7 +217,7 @@ public class FrameFlusher } @Override - protected State process() throws Exception + protected Next process() throws Exception { synchronized (lock) { @@ -241,11 +241,11 @@ public class FrameFlusher } if (buffers.size()==0) - return State.IDLE; + return Next.IDLE; endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()])); buffers.clear(); - return State.SCHEDULED; + return Next.SCHEDULED; } @Override