Fixed DeferredContentProvider race condition.

HttpSender was setting the listener for asynchronous content before its own state was properly setup.
This was causing race conditions, where a thread could notify HttpSender and find null data members causing later NPEs.

Now the listener is set after the state is setup, removing the race condition.
This commit is contained in:
Simone Bordet 2013-01-28 17:32:22 +01:00
parent 1737669df4
commit 7c53c317ae
4 changed files with 198 additions and 40 deletions

View File

@ -39,9 +39,7 @@ public interface AsyncContentProvider extends ContentProvider
{
/**
* Callback method invoked when content is available
*
* @param last whether it is the last notification of content availability
*/
public void onContent(boolean last);
public void onContent();
}
}

View File

@ -64,7 +64,7 @@ public class HttpSender implements AsyncContentProvider.Listener
}
@Override
public void onContent(boolean last)
public void onContent()
{
while (true)
{
@ -134,13 +134,16 @@ public class HttpSender implements AsyncContentProvider.Listener
requestNotifier.notifyBegin(request);
ContentProvider content = request.getContent();
if (content instanceof AsyncContentProvider)
((AsyncContentProvider)content).setListener(this);
this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
assert updated;
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the state has been updated
if (content instanceof AsyncContentProvider)
((AsyncContentProvider)content).setListener(this);
send();
}
}
@ -308,8 +311,7 @@ public class HttpSender implements AsyncContentProvider.Listener
{
if (updateSendState(currentSendState, SendState.EXECUTE))
{
// TODO: reload the chunk ?
LOG.debug("??? content for {}", request);
LOG.debug("Deferred content available for {}", request);
break out;
}
break;
@ -338,8 +340,24 @@ public class HttpSender implements AsyncContentProvider.Listener
{
if (generator.isEnd())
{
if (!updateSendState(SendState.EXECUTE, SendState.IDLE))
throw new IllegalStateException();
out: while (true)
{
currentSendState = sendState.get();
switch (currentSendState)
{
case EXECUTE:
case SCHEDULE:
{
if (!updateSendState(currentSendState, SendState.IDLE))
throw new IllegalStateException();
break out;
}
default:
{
throw new IllegalStateException();
}
}
}
success();
}
return;

View File

@ -20,8 +20,10 @@ package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.api.ContentProvider;
@ -72,9 +74,11 @@ import org.eclipse.jetty.client.api.Response;
*/
public class DeferredContentProvider implements AsyncContentProvider, AutoCloseable
{
private final Queue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
private volatile Listener listener;
private volatile boolean closed;
private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
private final Queue<ByteBuffer> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
/**
* Creates a new {@link DeferredContentProvider} with the given initial content
@ -84,13 +88,14 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea
public DeferredContentProvider(ByteBuffer... buffers)
{
for (ByteBuffer buffer : buffers)
queue.offer(buffer);
chunks.offer(buffer);
}
@Override
public void setListener(Listener listener)
{
this.listener = listener;
if (!this.listener.compareAndSet(null, listener))
throw new IllegalStateException();
}
@Override
@ -108,8 +113,8 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea
*/
public boolean offer(ByteBuffer buffer)
{
boolean result = queue.offer(buffer);
notifyListener(false);
boolean result = chunks.offer(buffer);
notifyListener();
return result;
}
@ -119,39 +124,44 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea
*/
public void close()
{
closed = true;
notifyListener(true);
chunks.offer(CLOSE);
notifyListener();
}
private void notifyListener(boolean last)
private void notifyListener()
{
Listener listener = this.listener;
Listener listener = this.listener.get();
if (listener != null)
listener.onContent(last);
listener.onContent();
}
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
return iterator;
}
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>
{
@Override
public boolean hasNext()
{
@Override
public boolean hasNext()
{
return !queue.isEmpty() || !closed;
}
return chunks.peek() != CLOSE;
}
@Override
public ByteBuffer next()
{
return queue.poll();
}
@Override
public ByteBuffer next()
{
ByteBuffer element = chunks.poll();
if (element == CLOSE)
throw new NoSuchElementException();
return element;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -30,9 +30,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -41,11 +44,13 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -373,4 +378,131 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final byte[] data = new byte[512];
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public void setListener(Listener listener)
{
super.setListener(listener);
// Simulate a concurrent call
offer(ByteBuffer.wrap(data));
close();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderRacingWithIterator() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final byte[] data = new byte[512];
final AtomicReference<DeferredContentProvider> contentRef = new AtomicReference<>();
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
{
// Data for the deferred content iterator:
// [0] => deferred
// [1] => deferred
// [2] => data
private final byte[][] iteratorData = new byte[3][];
private final AtomicInteger index = new AtomicInteger();
{
iteratorData[0] = null;
iteratorData[1] = null;
iteratorData[2] = data;
}
@Override
public boolean hasNext()
{
return index.get() < iteratorData.length;
}
@Override
public ByteBuffer next()
{
byte[] chunk = iteratorData[index.getAndIncrement()];
ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk);
if (index.get() == 2)
{
contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result);
contentRef.get().close();
}
return result;
}
@Override
public void remove()
{
}
};
}
};
contentRef.set(content);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}