This commit is contained in:
Thomas Becker 2013-01-29 09:17:32 +01:00
commit cd30ac104d
5 changed files with 258 additions and 102 deletions

View File

@ -39,9 +39,7 @@ public interface AsyncContentProvider extends ContentProvider
{
/**
* Callback method invoked when content is available
*
* @param last whether it is the last notification of content availability
*/
public void onContent(boolean last);
public void onContent();
}
}

View File

@ -64,7 +64,7 @@ public class HttpSender implements AsyncContentProvider.Listener
}
@Override
public void onContent(boolean last)
public void onContent()
{
while (true)
{
@ -134,14 +134,17 @@ public class HttpSender implements AsyncContentProvider.Listener
requestNotifier.notifyBegin(request);
ContentProvider content = request.getContent();
if (content instanceof AsyncContentProvider)
((AsyncContentProvider)content).setListener(this);
this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
if (updated)
send();
assert updated;
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the state has been updated
if (content instanceof AsyncContentProvider)
((AsyncContentProvider)content).setListener(this);
send();
}
}
@ -310,8 +313,7 @@ public class HttpSender implements AsyncContentProvider.Listener
{
if (updateSendState(currentSendState, SendState.EXECUTE))
{
// TODO: reload the chunk ?
LOG.debug("??? content for {}", request);
LOG.debug("Deferred content available for {}", request);
break out;
}
break;
@ -340,8 +342,24 @@ public class HttpSender implements AsyncContentProvider.Listener
{
if (generator.isEnd())
{
if (!updateSendState(SendState.EXECUTE, SendState.IDLE))
throw new IllegalStateException();
out: while (true)
{
currentSendState = sendState.get();
switch (currentSendState)
{
case EXECUTE:
case SCHEDULE:
{
if (!updateSendState(currentSendState, SendState.IDLE))
throw new IllegalStateException();
break out;
}
default:
{
throw new IllegalStateException();
}
}
}
success();
}
return;

View File

@ -20,8 +20,10 @@ package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.api.ContentProvider;
@ -72,9 +74,11 @@ import org.eclipse.jetty.client.api.Response;
*/
public class DeferredContentProvider implements AsyncContentProvider, AutoCloseable
{
private final Queue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
private volatile Listener listener;
private volatile boolean closed;
private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
private final Queue<ByteBuffer> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
/**
* Creates a new {@link DeferredContentProvider} with the given initial content
@ -84,13 +88,14 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea
public DeferredContentProvider(ByteBuffer... buffers)
{
for (ByteBuffer buffer : buffers)
queue.offer(buffer);
chunks.offer(buffer);
}
@Override
public void setListener(Listener listener)
{
this.listener = listener;
if (!this.listener.compareAndSet(null, listener))
throw new IllegalStateException();
}
@Override
@ -108,8 +113,8 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea
*/
public boolean offer(ByteBuffer buffer)
{
boolean result = queue.offer(buffer);
notifyListener(false);
boolean result = chunks.offer(buffer);
notifyListener();
return result;
}
@ -119,39 +124,44 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea
*/
public void close()
{
closed = true;
notifyListener(true);
chunks.offer(CLOSE);
notifyListener();
}
private void notifyListener(boolean last)
private void notifyListener()
{
Listener listener = this.listener;
Listener listener = this.listener.get();
if (listener != null)
listener.onContent(last);
listener.onContent();
}
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
return iterator;
}
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>
{
@Override
public boolean hasNext()
{
@Override
public boolean hasNext()
{
return !queue.isEmpty() || !closed;
}
return chunks.peek() != CLOSE;
}
@Override
public ByteBuffer next()
{
return queue.poll();
}
@Override
public ByteBuffer next()
{
ByteBuffer element = chunks.poll();
if (element == CLOSE)
throw new NoSuchElementException();
return element;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -30,9 +30,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -41,11 +44,13 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -373,4 +378,131 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final byte[] data = new byte[512];
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public void setListener(Listener listener)
{
super.setListener(listener);
// Simulate a concurrent call
offer(ByteBuffer.wrap(data));
close();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderRacingWithIterator() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final byte[] data = new byte[512];
final AtomicReference<DeferredContentProvider> contentRef = new AtomicReference<>();
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
{
// Data for the deferred content iterator:
// [0] => deferred
// [1] => deferred
// [2] => data
private final byte[][] iteratorData = new byte[3][];
private final AtomicInteger index = new AtomicInteger();
{
iteratorData[0] = null;
iteratorData[1] = null;
iteratorData[2] = data;
}
@Override
public boolean hasNext()
{
return index.get() < iteratorData.length;
}
@Override
public ByteBuffer next()
{
byte[] chunk = iteratorData[index.getAndIncrement()];
ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk);
if (index.get() == 2)
{
contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result);
contentRef.get().close();
}
return result;
}
@Override
public void remove()
{
}
};
}
};
contentRef.set(content);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -48,34 +48,37 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{
/**
* The head offset in the {@link #_indexes} array, displaced
* by 15 slots to avoid false sharing with the array length
* (stored before the first element of the array itself).
*/
private static final int HEAD_OFFSET = 15;
/**
* The tail offset in the {@link #_indexes} array, displaced
* by 16 slots from the head to avoid false sharing with it.
*/
private static final int TAIL_OFFSET = 31;
/**
* Default initial capacity, 128.
*/
public final int DEFAULT_CAPACITY = 128;
public static final int DEFAULT_CAPACITY = 128;
/**
* Default growth factor, 64.
*/
public final int DEFAULT_GROWTH = 64;
public static final int DEFAULT_GROWTH = 64;
private final int _maxCapacity;
private final AtomicInteger _size = new AtomicInteger();
private final int _growCapacity;
private Object[] _elements;
/**
* Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
*/
private final int[] _indexes = new int[TAIL_OFFSET + 1];
private final Lock _tailLock = new ReentrantLock();
private final AtomicInteger _size = new AtomicInteger();
private final Lock _headLock = new ReentrantLock();
private final Condition _notEmpty = _headLock.newCondition();
private int _head;
// Spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
// TODO verify these spacers really prevent false sharing
private long _space0;
private long _space1;
private long _space2;
private long _space3;
private long _space4;
private long _space5;
private long _space6;
private long _space7;
private final Lock _tailLock = new ReentrantLock();
private int _tail;
private Object[] _elements;
/**
* Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
@ -147,8 +150,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
headLock.lock();
try
{
_head = 0;
_tail = 0;
_indexes[HEAD_OFFSET] = 0;
_indexes[TAIL_OFFSET] = 0;
_size.set(0);
}
finally
@ -192,10 +195,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
{
if (_size.get() > 0)
{
final int head = _head;
final int head = _indexes[HEAD_OFFSET];
e = (E)_elements[head];
_elements[head] = null;
_head = (head + 1) % _elements.length;
_indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
if (_size.decrementAndGet() > 0)
_notEmpty.signal();
}
@ -220,7 +223,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
try
{
if (_size.get() > 0)
e = (E)_elements[_head];
e = (E)_elements[_indexes[HEAD_OFFSET]];
}
finally
{
@ -281,11 +284,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
}
// Must re-read fields since there may have been a grow
// Add the element
int tail = _tail;
// Re-read head and tail after a possible grow
int tail = _indexes[TAIL_OFFSET];
_elements[tail] = e;
_tail = (tail + 1) % _elements.length;
_indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
notEmpty = _size.getAndIncrement() == 0;
}
finally
@ -354,10 +356,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
throw ie;
}
final int head = _head;
final int head = _indexes[HEAD_OFFSET];
e = (E)_elements[head];
_elements[head] = null;
_head = (head + 1) % _elements.length;
_indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
if (_size.decrementAndGet() > 0)
_notEmpty.signal();
@ -394,10 +396,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
throw x;
}
int head = _head;
int head = _indexes[HEAD_OFFSET];
e = (E)_elements[head];
_elements[head] = null;
_head = (head + 1) % _elements.length;
_indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
if (_size.decrementAndGet() > 0)
_notEmpty.signal();
@ -423,8 +425,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
if (isEmpty())
return false;
final int head = _head;
final int tail = _tail;
final int head = _indexes[HEAD_OFFSET];
final int tail = _indexes[TAIL_OFFSET];
final int capacity = _elements.length;
int i = head;
@ -507,7 +509,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
{
if (index < 0 || index >= _size.get())
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
int i = _head + index;
int i = _indexes[HEAD_OFFSET] + index;
int capacity = _elements.length;
if (i >= capacity)
i -= capacity;
@ -549,19 +551,20 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
else
{
if (_tail == _head)
if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
if (!grow())
throw new IllegalStateException("full");
int i = _head + index;
// Re-read head and tail after a possible grow
int i = _indexes[HEAD_OFFSET] + index;
int capacity = _elements.length;
if (i >= capacity)
i -= capacity;
_size.incrementAndGet();
int tail = _tail;
_tail = tail = (tail + 1) % capacity;
int tail = _indexes[TAIL_OFFSET];
_indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
if (i < tail)
{
@ -609,7 +612,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
if (index < 0 || index >= _size.get())
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
int i = _head + index;
int i = _indexes[HEAD_OFFSET] + index;
int capacity = _elements.length;
if (i >= capacity)
i -= capacity;
@ -643,17 +646,17 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
if (index < 0 || index >= _size.get())
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
int i = _head + index;
int i = _indexes[HEAD_OFFSET] + index;
int capacity = _elements.length;
if (i >= capacity)
i -= capacity;
E old = (E)_elements[i];
int tail = _tail;
int tail = _indexes[TAIL_OFFSET];
if (i < tail)
{
System.arraycopy(_elements, i + 1, _elements, i, tail - i);
--_tail;
--_indexes[TAIL_OFFSET];
}
else
{
@ -662,13 +665,13 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
if (tail > 0)
{
System.arraycopy(_elements, 1, _elements, 0, tail);
--_tail;
--_indexes[TAIL_OFFSET];
}
else
{
_tail = capacity - 1;
_indexes[TAIL_OFFSET] = capacity - 1;
}
_elements[_tail] = null;
_elements[_indexes[TAIL_OFFSET]] = null;
}
_size.decrementAndGet();
@ -700,15 +703,17 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
Object[] elements = new Object[size()];
if (size() > 0)
{
if (_head < _tail)
int head = _indexes[HEAD_OFFSET];
int tail = _indexes[TAIL_OFFSET];
if (head < tail)
{
System.arraycopy(_elements, _head, elements, 0, _tail - _head);
System.arraycopy(_elements, head, elements, 0, tail - head);
}
else
{
int chunk = _elements.length - _head;
System.arraycopy(_elements, _head, elements, 0, chunk);
System.arraycopy(_elements, 0, elements, chunk, _tail);
int chunk = _elements.length - head;
System.arraycopy(_elements, head, elements, 0, chunk);
System.arraycopy(_elements, 0, elements, chunk, tail);
}
}
return new Itr(elements, index);
@ -761,8 +766,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
headLock.lock();
try
{
final int head = _head;
final int tail = _tail;
final int head = _indexes[HEAD_OFFSET];
final int tail = _indexes[TAIL_OFFSET];
final int newTail;
final int capacity = _elements.length;
@ -786,8 +791,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
_elements = elements;
_head = 0;
_tail = newTail;
_indexes[HEAD_OFFSET] = 0;
_indexes[TAIL_OFFSET] = newTail;
return true;
}
finally
@ -801,13 +806,6 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
}
// TODO: verify this is not optimized away by the JIT
long sumOfSpace()
{
// this method exists to stop clever optimisers removing the spacers
return _space0++ + _space1++ + _space2++ + _space3++ + _space4++ + _space5++ + _space6++ + _space7++;
}
private class Itr implements ListIterator<E>
{
private final Object[] _elements;