From 1737669df49f486761be7fe77e73722697fa7fd3 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 28 Jan 2013 12:32:03 +0100 Subject: [PATCH 1/2] 399242 - Reduce/eliminate false sharing in BlockingArrayQueue. Now head and tail are stored in an array, spaced by empty elements to avoid false sharing. --- .../jetty/util/BlockingArrayQueue.java | 118 +++++++++--------- 1 file changed, 58 insertions(+), 60 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java index 1585dbc9180..36b7e46d136 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java @@ -48,34 +48,37 @@ import java.util.concurrent.locks.ReentrantLock; */ public class BlockingArrayQueue extends AbstractList implements BlockingQueue { + /** + * The head offset in the {@link #_indexes} array, displaced + * by 15 slots to avoid false sharing with the array length + * (stored before the first element of the array itself). + */ + private static final int HEAD_OFFSET = 15; + /** + * The tail offset in the {@link #_indexes} array, displaced + * by 16 slots from the head to avoid false sharing with it. + */ + private static final int TAIL_OFFSET = 31; /** * Default initial capacity, 128. */ - public final int DEFAULT_CAPACITY = 128; + public static final int DEFAULT_CAPACITY = 128; /** * Default growth factor, 64. */ - public final int DEFAULT_GROWTH = 64; + public static final int DEFAULT_GROWTH = 64; private final int _maxCapacity; - private final AtomicInteger _size = new AtomicInteger(); private final int _growCapacity; - private Object[] _elements; + /** + * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing + */ + private final int[] _indexes = new int[TAIL_OFFSET + 1]; + private final Lock _tailLock = new ReentrantLock(); + private final AtomicInteger _size = new AtomicInteger(); private final Lock _headLock = new ReentrantLock(); private final Condition _notEmpty = _headLock.newCondition(); - private int _head; - // Spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing - // TODO verify these spacers really prevent false sharing - private long _space0; - private long _space1; - private long _space2; - private long _space3; - private long _space4; - private long _space5; - private long _space6; - private long _space7; - private final Lock _tailLock = new ReentrantLock(); - private int _tail; + private Object[] _elements; /** * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor. @@ -147,8 +150,8 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu headLock.lock(); try { - _head = 0; - _tail = 0; + _indexes[HEAD_OFFSET] = 0; + _indexes[TAIL_OFFSET] = 0; _size.set(0); } finally @@ -192,10 +195,10 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu { if (_size.get() > 0) { - final int head = _head; + final int head = _indexes[HEAD_OFFSET]; e = (E)_elements[head]; _elements[head] = null; - _head = (head + 1) % _elements.length; + _indexes[HEAD_OFFSET] = (head + 1) % _elements.length; if (_size.decrementAndGet() > 0) _notEmpty.signal(); } @@ -220,7 +223,7 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu try { if (_size.get() > 0) - e = (E)_elements[_head]; + e = (E)_elements[_indexes[HEAD_OFFSET]]; } finally { @@ -281,11 +284,10 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu } } - // Must re-read fields since there may have been a grow - // Add the element - int tail = _tail; + // Re-read head and tail after a possible grow + int tail = _indexes[TAIL_OFFSET]; _elements[tail] = e; - _tail = (tail + 1) % _elements.length; + _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length; notEmpty = _size.getAndIncrement() == 0; } finally @@ -354,10 +356,10 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu throw ie; } - final int head = _head; + final int head = _indexes[HEAD_OFFSET]; e = (E)_elements[head]; _elements[head] = null; - _head = (head + 1) % _elements.length; + _indexes[HEAD_OFFSET] = (head + 1) % _elements.length; if (_size.decrementAndGet() > 0) _notEmpty.signal(); @@ -394,10 +396,10 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu throw x; } - int head = _head; + int head = _indexes[HEAD_OFFSET]; e = (E)_elements[head]; _elements[head] = null; - _head = (head + 1) % _elements.length; + _indexes[HEAD_OFFSET] = (head + 1) % _elements.length; if (_size.decrementAndGet() > 0) _notEmpty.signal(); @@ -423,8 +425,8 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu if (isEmpty()) return false; - final int head = _head; - final int tail = _tail; + final int head = _indexes[HEAD_OFFSET]; + final int tail = _indexes[TAIL_OFFSET]; final int capacity = _elements.length; int i = head; @@ -507,7 +509,7 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu { if (index < 0 || index >= _size.get()) throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); - int i = _head + index; + int i = _indexes[HEAD_OFFSET] + index; int capacity = _elements.length; if (i >= capacity) i -= capacity; @@ -549,19 +551,20 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu } else { - if (_tail == _head) + if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET]) if (!grow()) throw new IllegalStateException("full"); - int i = _head + index; + // Re-read head and tail after a possible grow + int i = _indexes[HEAD_OFFSET] + index; int capacity = _elements.length; if (i >= capacity) i -= capacity; _size.incrementAndGet(); - int tail = _tail; - _tail = tail = (tail + 1) % capacity; + int tail = _indexes[TAIL_OFFSET]; + _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity; if (i < tail) { @@ -609,7 +612,7 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu if (index < 0 || index >= _size.get()) throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); - int i = _head + index; + int i = _indexes[HEAD_OFFSET] + index; int capacity = _elements.length; if (i >= capacity) i -= capacity; @@ -643,17 +646,17 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu if (index < 0 || index >= _size.get()) throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); - int i = _head + index; + int i = _indexes[HEAD_OFFSET] + index; int capacity = _elements.length; if (i >= capacity) i -= capacity; E old = (E)_elements[i]; - int tail = _tail; + int tail = _indexes[TAIL_OFFSET]; if (i < tail) { System.arraycopy(_elements, i + 1, _elements, i, tail - i); - --_tail; + --_indexes[TAIL_OFFSET]; } else { @@ -662,13 +665,13 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu if (tail > 0) { System.arraycopy(_elements, 1, _elements, 0, tail); - --_tail; + --_indexes[TAIL_OFFSET]; } else { - _tail = capacity - 1; + _indexes[TAIL_OFFSET] = capacity - 1; } - _elements[_tail] = null; + _elements[_indexes[TAIL_OFFSET]] = null; } _size.decrementAndGet(); @@ -700,15 +703,17 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu Object[] elements = new Object[size()]; if (size() > 0) { - if (_head < _tail) + int head = _indexes[HEAD_OFFSET]; + int tail = _indexes[TAIL_OFFSET]; + if (head < tail) { - System.arraycopy(_elements, _head, elements, 0, _tail - _head); + System.arraycopy(_elements, head, elements, 0, tail - head); } else { - int chunk = _elements.length - _head; - System.arraycopy(_elements, _head, elements, 0, chunk); - System.arraycopy(_elements, 0, elements, chunk, _tail); + int chunk = _elements.length - head; + System.arraycopy(_elements, head, elements, 0, chunk); + System.arraycopy(_elements, 0, elements, chunk, tail); } } return new Itr(elements, index); @@ -761,8 +766,8 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu headLock.lock(); try { - final int head = _head; - final int tail = _tail; + final int head = _indexes[HEAD_OFFSET]; + final int tail = _indexes[TAIL_OFFSET]; final int newTail; final int capacity = _elements.length; @@ -786,8 +791,8 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu } _elements = elements; - _head = 0; - _tail = newTail; + _indexes[HEAD_OFFSET] = 0; + _indexes[TAIL_OFFSET] = newTail; return true; } finally @@ -801,13 +806,6 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu } } - // TODO: verify this is not optimized away by the JIT - long sumOfSpace() - { - // this method exists to stop clever optimisers removing the spacers - return _space0++ + _space1++ + _space2++ + _space3++ + _space4++ + _space5++ + _space6++ + _space7++; - } - private class Itr implements ListIterator { private final Object[] _elements; From 7c53c317ae830d4e518ebe50428028a26aef733c Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 28 Jan 2013 17:32:22 +0100 Subject: [PATCH 2/2] Fixed DeferredContentProvider race condition. HttpSender was setting the listener for asynchronous content before its own state was properly setup. This was causing race conditions, where a thread could notify HttpSender and find null data members causing later NPEs. Now the listener is set after the state is setup, removing the race condition. --- .../jetty/client/AsyncContentProvider.java | 4 +- .../org/eclipse/jetty/client/HttpSender.java | 34 +++-- .../client/util/DeferredContentProvider.java | 68 +++++---- .../jetty/client/HttpClientStreamTest.java | 132 ++++++++++++++++++ 4 files changed, 198 insertions(+), 40 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java index 4c0c6c3345c..a45c881978c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java @@ -39,9 +39,7 @@ public interface AsyncContentProvider extends ContentProvider { /** * Callback method invoked when content is available - * - * @param last whether it is the last notification of content availability */ - public void onContent(boolean last); + public void onContent(); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 67115fdb81b..1575d20c37b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -64,7 +64,7 @@ public class HttpSender implements AsyncContentProvider.Listener } @Override - public void onContent(boolean last) + public void onContent() { while (true) { @@ -134,13 +134,16 @@ public class HttpSender implements AsyncContentProvider.Listener requestNotifier.notifyBegin(request); ContentProvider content = request.getContent(); - if (content instanceof AsyncContentProvider) - ((AsyncContentProvider)content).setListener(this); - this.contentIterator = content == null ? Collections.emptyIterator() : content.iterator(); boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE); assert updated; + + // Setting the listener may trigger calls to onContent() by other + // threads so we must set it only after the state has been updated + if (content instanceof AsyncContentProvider) + ((AsyncContentProvider)content).setListener(this); + send(); } } @@ -308,8 +311,7 @@ public class HttpSender implements AsyncContentProvider.Listener { if (updateSendState(currentSendState, SendState.EXECUTE)) { - // TODO: reload the chunk ? - LOG.debug("??? content for {}", request); + LOG.debug("Deferred content available for {}", request); break out; } break; @@ -338,8 +340,24 @@ public class HttpSender implements AsyncContentProvider.Listener { if (generator.isEnd()) { - if (!updateSendState(SendState.EXECUTE, SendState.IDLE)) - throw new IllegalStateException(); + out: while (true) + { + currentSendState = sendState.get(); + switch (currentSendState) + { + case EXECUTE: + case SCHEDULE: + { + if (!updateSendState(currentSendState, SendState.IDLE)) + throw new IllegalStateException(); + break out; + } + default: + { + throw new IllegalStateException(); + } + } + } success(); } return; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java index e43452147da..64d0e77c504 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java @@ -20,8 +20,10 @@ package org.eclipse.jetty.client.util; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.AsyncContentProvider; import org.eclipse.jetty.client.api.ContentProvider; @@ -72,9 +74,11 @@ import org.eclipse.jetty.client.api.Response; */ public class DeferredContentProvider implements AsyncContentProvider, AutoCloseable { - private final Queue queue = new ConcurrentLinkedQueue<>(); - private volatile Listener listener; - private volatile boolean closed; + private static final ByteBuffer CLOSE = ByteBuffer.allocate(0); + + private final Queue chunks = new ConcurrentLinkedQueue<>(); + private final AtomicReference listener = new AtomicReference<>(); + private final Iterator iterator = new DeferredContentProviderIterator(); /** * Creates a new {@link DeferredContentProvider} with the given initial content @@ -84,13 +88,14 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea public DeferredContentProvider(ByteBuffer... buffers) { for (ByteBuffer buffer : buffers) - queue.offer(buffer); + chunks.offer(buffer); } @Override public void setListener(Listener listener) { - this.listener = listener; + if (!this.listener.compareAndSet(null, listener)) + throw new IllegalStateException(); } @Override @@ -108,8 +113,8 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea */ public boolean offer(ByteBuffer buffer) { - boolean result = queue.offer(buffer); - notifyListener(false); + boolean result = chunks.offer(buffer); + notifyListener(); return result; } @@ -119,39 +124,44 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea */ public void close() { - closed = true; - notifyListener(true); + chunks.offer(CLOSE); + notifyListener(); } - private void notifyListener(boolean last) + private void notifyListener() { - Listener listener = this.listener; + Listener listener = this.listener.get(); if (listener != null) - listener.onContent(last); + listener.onContent(); } @Override public Iterator iterator() { - return new Iterator() + return iterator; + } + + private class DeferredContentProviderIterator implements Iterator + { + @Override + public boolean hasNext() { - @Override - public boolean hasNext() - { - return !queue.isEmpty() || !closed; - } + return chunks.peek() != CLOSE; + } - @Override - public ByteBuffer next() - { - return queue.poll(); - } + @Override + public ByteBuffer next() + { + ByteBuffer element = chunks.poll(); + if (element == CLOSE) + throw new NoSuchElementException(); + return element; + } - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 3bd46cf3273..856172a20b8 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -30,9 +30,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -41,11 +44,13 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; @@ -373,4 +378,131 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest } Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + final byte[] data = new byte[512]; + final DeferredContentProvider content = new DeferredContentProvider() + { + @Override + public void setListener(Listener listener) + { + super.setListener(listener); + // Simulate a concurrent call + offer(ByteBuffer.wrap(data)); + close(); + } + }; + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded() && + result.getResponse().getStatus() == 200 && + Arrays.equals(data, getContent())) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testUploadWithDeferredContentProviderRacingWithIterator() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + final byte[] data = new byte[512]; + final AtomicReference contentRef = new AtomicReference<>(); + final DeferredContentProvider content = new DeferredContentProvider() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + // Data for the deferred content iterator: + // [0] => deferred + // [1] => deferred + // [2] => data + private final byte[][] iteratorData = new byte[3][]; + private final AtomicInteger index = new AtomicInteger(); + + { + iteratorData[0] = null; + iteratorData[1] = null; + iteratorData[2] = data; + } + + @Override + public boolean hasNext() + { + return index.get() < iteratorData.length; + } + + @Override + public ByteBuffer next() + { + byte[] chunk = iteratorData[index.getAndIncrement()]; + ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk); + if (index.get() == 2) + { + contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result); + contentRef.get().close(); + } + return result; + } + + @Override + public void remove() + { + } + }; + } + }; + contentRef.set(content); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded() && + result.getResponse().getStatus() == 200 && + Arrays.equals(data, getContent())) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } }