Fixes #966 - Remove usages of ConcurrentArrayQueue.

This commit is contained in:
Simone Bordet 2016-09-28 11:39:21 +02:00
parent efe339e246
commit 0f97c3df5d
11 changed files with 225 additions and 67 deletions

View File

@ -19,10 +19,10 @@
package org.eclipse.jetty.fcgi.generator; package org.eclipse.jetty.fcgi.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -31,7 +31,7 @@ public class Flusher
{ {
private static final Logger LOG = Log.getLogger(Flusher.class); private static final Logger LOG = Log.getLogger(Flusher.class);
private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>(); private final Queue<Generator.Result> queue = new ArrayDeque<>();
private final IteratingCallback flushCallback = new FlushCallback(); private final IteratingCallback flushCallback = new FlushCallback();
private final EndPoint endPoint; private final EndPoint endPoint;
@ -43,10 +43,26 @@ public class Flusher
public void flush(Generator.Result... results) public void flush(Generator.Result... results)
{ {
for (Generator.Result result : results) for (Generator.Result result : results)
queue.offer(result); offer(result);
flushCallback.iterate(); flushCallback.iterate();
} }
private void offer(Generator.Result result)
{
synchronized (this)
{
queue.offer(result);
}
}
private Generator.Result poll()
{
synchronized (this)
{
return queue.poll();
}
}
public void shutdown() public void shutdown()
{ {
flush(new ShutdownResult()); flush(new ShutdownResult());
@ -60,7 +76,7 @@ public class Flusher
protected Action process() throws Exception protected Action process() throws Exception
{ {
// Look if other writes are needed. // Look if other writes are needed.
Generator.Result result = queue.poll(); Generator.Result result = poll();
if (result == null) if (result == null)
{ {
// No more writes to do, return. // No more writes to do, return.
@ -71,7 +87,7 @@ public class Flusher
// Most often there is another result in the // Most often there is another result in the
// queue so this is a real optimization because // queue so this is a real optimization because
// it sends both results in just one TCP packet. // it sends both results in just one TCP packet.
Generator.Result other = queue.poll(); Generator.Result other = poll();
if (other != null) if (other != null)
result = result.join(other); result = result.join(other);
@ -106,7 +122,7 @@ public class Flusher
while (true) while (true)
{ {
Generator.Result result = queue.poll(); Generator.Result result = poll();
if (result == null) if (result == null)
break; break;
result.failed(x); result.failed(x);

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -30,7 +31,6 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ -39,7 +39,7 @@ public class HTTP2Connection extends AbstractConnection
{ {
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>(); private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer(); private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong(); private final AtomicLong bytesIn = new AtomicLong();
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
@ -140,7 +140,7 @@ public class HTTP2Connection extends AbstractConnection
protected void offerTask(Runnable task, boolean dispatch) protected void offerTask(Runnable task, boolean dispatch)
{ {
tasks.offer(task); offerTask(task);
if (dispatch) if (dispatch)
executionStrategy.dispatch(); executionStrategy.dispatch();
else else
@ -155,6 +155,22 @@ public class HTTP2Connection extends AbstractConnection
session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
} }
private void offerTask(Runnable task)
{
synchronized (this)
{
tasks.offer(task);
}
}
private Runnable pollTask()
{
synchronized (this)
{
return tasks.poll();
}
}
protected class HTTP2Producer implements ExecutionStrategy.Producer protected class HTTP2Producer implements ExecutionStrategy.Producer
{ {
private final Callback fillCallback = new FillCallback(); private final Callback fillCallback = new FillCallback();
@ -163,7 +179,7 @@ public class HTTP2Connection extends AbstractConnection
@Override @Override
public Runnable produce() public Runnable produce()
{ {
Runnable task = tasks.poll(); Runnable task = pollTask();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Dequeued task {}", task); LOG.debug("Dequeued task {}", task);
if (task != null) if (task != null)
@ -182,7 +198,7 @@ public class HTTP2Connection extends AbstractConnection
while (buffer.hasRemaining()) while (buffer.hasRemaining())
parser.parse(buffer); parser.parse(buffer);
task = tasks.poll(); task = pollTask();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Dequeued new task {}", task); LOG.debug("Dequeued new task {}", task);
if (task != null) if (task != null)

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.server; package org.eclipse.jetty.http2.server;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
@ -53,13 +54,12 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{ {
private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>(); private final Queue<HttpChannelOverHTTP2> channels = new ArrayDeque<>();
private final List<Frame> upgradeFrames = new ArrayList<>(); private final List<Frame> upgradeFrames = new ArrayList<>();
private final AtomicLong totalRequests = new AtomicLong(); private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong totalResponses = new AtomicLong(); private final AtomicLong totalResponses = new AtomicLong();
@ -197,7 +197,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream) private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
{ {
HttpChannelOverHTTP2 channel = channels.poll(); HttpChannelOverHTTP2 channel = pollChannel();
if (channel != null) if (channel != null)
{ {
channel.getHttpTransport().setStream(stream); channel.getHttpTransport().setStream(stream);
@ -216,6 +216,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
return channel; return channel;
} }
private void offerChannel(HttpChannelOverHTTP2 channel)
{
synchronized (this)
{
channels.offer(channel);
}
}
private HttpChannelOverHTTP2 pollChannel()
{
synchronized (this)
{
return channels.poll();
}
}
public boolean upgrade(Request request) public boolean upgrade(Request request)
{ {
if (HttpMethod.PRI.is(request.getMethod())) if (HttpMethod.PRI.is(request.getMethod()))
@ -278,7 +294,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
{ {
getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE); getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE);
super.recycle(); super.recycle();
channels.offer(this); offerChannel(this);
} }
@Override @Override

View File

@ -19,13 +19,15 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
/** /**
* <p>A {@link ByteBuffer} pool.</p> * <p>A {@link ByteBuffer} pool.</p>
@ -126,10 +128,11 @@ public interface ByteBufferPool
class Bucket class Bucket
{ {
private final Lock _lock = new ReentrantLock();
private final Queue<ByteBuffer> _queue = new ArrayDeque<>();
private final ByteBufferPool _pool; private final ByteBufferPool _pool;
private final int _capacity; private final int _capacity;
private final AtomicInteger _space; private final AtomicInteger _space;
private final Queue<ByteBuffer> _queue = new ConcurrentArrayQueue<>();
public Bucket(ByteBufferPool pool, int bufferSize, int maxSize) public Bucket(ByteBufferPool pool, int bufferSize, int maxSize)
{ {
@ -138,20 +141,9 @@ public interface ByteBufferPool
_space = maxSize > 0 ? new AtomicInteger(maxSize) : null; _space = maxSize > 0 ? new AtomicInteger(maxSize) : null;
} }
public void release(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
if (_space == null)
_queue.offer(buffer);
else if (_space.decrementAndGet() >= 0)
_queue.offer(buffer);
else
_space.incrementAndGet();
}
public ByteBuffer acquire(boolean direct) public ByteBuffer acquire(boolean direct)
{ {
ByteBuffer buffer = _queue.poll(); ByteBuffer buffer = queuePoll();
if (buffer == null) if (buffer == null)
return _pool.newByteBuffer(_capacity, direct); return _pool.newByteBuffer(_capacity, direct);
if (_space != null) if (_space != null)
@ -159,37 +151,108 @@ public interface ByteBufferPool
return buffer; return buffer;
} }
public void release(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
if (_space == null)
queueOffer(buffer);
else if (_space.decrementAndGet() >= 0)
queueOffer(buffer);
else
_space.incrementAndGet();
}
public void clear() public void clear()
{ {
if (_space == null) if (_space == null)
{ {
_queue.clear(); queueClear();
} }
else else
{ {
int s = _space.getAndSet(0); int s = _space.getAndSet(0);
while (s-- > 0) while (s-- > 0)
{ {
if (_queue.poll() == null) if (queuePoll() == null)
_space.incrementAndGet(); _space.incrementAndGet();
} }
} }
} }
private void queueOffer(ByteBuffer buffer)
{
Lock lock = _lock;
lock.lock();
try
{
_queue.offer(buffer);
}
finally
{
lock.unlock();
}
}
private ByteBuffer queuePoll()
{
Lock lock = _lock;
lock.lock();
try
{
return _queue.poll();
}
finally
{
lock.unlock();
}
}
private void queueClear()
{
Lock lock = _lock;
lock.lock();
try
{
_queue.clear();
}
finally
{
lock.unlock();
}
}
boolean isEmpty() boolean isEmpty()
{ {
return _queue.isEmpty(); Lock lock = _lock;
lock.lock();
try
{
return _queue.isEmpty();
}
finally
{
lock.unlock();
}
} }
int size() int size()
{ {
return _queue.size(); Lock lock = _lock;
lock.lock();
try
{
return _queue.size();
}
finally
{
lock.unlock();
}
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("Bucket@%x{%d,%d}", hashCode(), _capacity, _queue.size()); return String.format("Bucket@%x{%d/%d}", hashCode(), size(), _capacity);
} }
} }
} }

View File

@ -21,12 +21,12 @@ package org.eclipse.jetty.server;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -35,16 +35,8 @@ import org.junit.Test;
public class HttpInputTest public class HttpInputTest
{ {
private final Queue<String> _history = new ConcurrentArrayQueue<String>() private final Queue<String> _history = new LinkedBlockingQueue<>();
{ private final Queue<String> _fillAndParseSimulate = new LinkedBlockingQueue<>();
@Override
public boolean add(String s)
{
//System.err.println("history: "+s);
return super.add(s);
}
};
private final Queue<String> _fillAndParseSimulate = new ConcurrentArrayQueue<>();
private final ReadListener _listener = new ReadListener() private final ReadListener _listener = new ReadListener()
{ {
@Override @Override

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName; import javax.net.ssl.SNIServerName;
@ -51,7 +52,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SocketCustomizationListener; import org.eclipse.jetty.server.SocketCustomizationListener;
import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -363,7 +363,7 @@ public class SniSslConnectionFactoryTest
@Test @Test
public void testSocketCustomization() throws Exception public void testSocketCustomization() throws Exception
{ {
final Queue<String> history = new ConcurrentArrayQueue<>(); final Queue<String> history = new LinkedBlockingQueue<>();
_connector.addBean(new SocketCustomizationListener() _connector.addBean(new SocketCustomizationListener()
{ {

View File

@ -28,6 +28,7 @@ import java.security.cert.X509Certificate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName; import javax.net.ssl.SNIServerName;
@ -49,7 +50,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SocketCustomizationListener; import org.eclipse.jetty.server.SocketCustomizationListener;
import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -196,7 +196,7 @@ public class SslConnectionFactoryTest
@Test @Test
public void testSocketCustomization() throws Exception public void testSocketCustomization() throws Exception
{ {
final Queue<String> history = new ConcurrentArrayQueue<>(); final Queue<String> history = new LinkedBlockingQueue<>();
_connector.addBean(new SocketCustomizationListener() _connector.addBean(new SocketCustomizationListener()
{ {

View File

@ -23,9 +23,8 @@ import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer; import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
@ -37,8 +36,8 @@ public class ExecuteProduceConsumeTest
{ {
private static final Runnable NULLTASK = () -> {}; private static final Runnable NULLTASK = () -> {};
private final BlockingQueue<Runnable> _produce = new BlockingArrayQueue<>(); private final BlockingQueue<Runnable> _produce = new LinkedBlockingQueue<>();
private final Queue<Runnable> _executions = new ConcurrentArrayQueue<>(); private final Queue<Runnable> _executions = new LinkedBlockingQueue<>();
private ExecuteProduceConsume _ewyk; private ExecuteProduceConsume _ewyk;
private volatile Thread _producer; private volatile Thread _producer;

View File

@ -19,12 +19,12 @@
package org.eclipse.jetty.websocket.common.extensions; package org.eclipse.jetty.websocket.common.extensions;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
@ -52,7 +52,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
{ {
private static final Logger LOG = Log.getLogger(ExtensionStack.class); private static final Logger LOG = Log.getLogger(ExtensionStack.class);
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private final ExtensionFactory factory; private final ExtensionFactory factory;
private List<Extension> extensions; private List<Extension> extensions;
@ -292,7 +292,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
FrameEntry entry = new FrameEntry(frame,callback,batchMode); FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}",entry); LOG.debug("Queuing {}",entry);
entries.offer(entry); offerEntry(entry);
flusher.iterate(); flusher.iterate();
} }
@ -317,12 +317,36 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
} }
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private int getQueueSize()
{
synchronized (this)
{
return entries.size();
}
}
@Override @Override
public String toString() public String toString()
{ {
StringBuilder s = new StringBuilder(); StringBuilder s = new StringBuilder();
s.append("ExtensionStack["); s.append("ExtensionStack[");
s.append("queueSize=").append(entries.size()); s.append("queueSize=").append(getQueueSize());
s.append(",extensions="); s.append(",extensions=");
if (extensions == null) if (extensions == null)
{ {
@ -383,7 +407,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
current = entries.poll(); current = pollEntry();
if (current == null) if (current == null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions.compress;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
@ -28,7 +29,6 @@ import java.util.zip.Inflater;
import java.util.zip.ZipException; import java.util.zip.ZipException;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -72,7 +72,7 @@ public abstract class CompressExtension extends AbstractExtension
private final static boolean NOWRAP = true; private final static boolean NOWRAP = true;
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private Deflater deflaterImpl; private Deflater deflaterImpl;
private Inflater inflaterImpl; private Inflater inflaterImpl;
@ -214,10 +214,26 @@ public abstract class CompressExtension extends AbstractExtension
FrameEntry entry = new FrameEntry(frame,callback,batchMode); FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}",entry); LOG.debug("Queuing {}",entry);
entries.offer(entry); offerEntry(entry);
flusher.iterate(); flusher.iterate();
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
protected void notifyCallbackSuccess(WriteCallback callback) protected void notifyCallbackSuccess(WriteCallback callback)
{ {
try try
@ -408,7 +424,7 @@ public abstract class CompressExtension extends AbstractExtension
{ {
if (finished) if (finished)
{ {
current = entries.poll(); current = pollEntry();
LOG.debug("Processing {}",current); LOG.debug("Processing {}",current);
if (current == null) if (current == null)
return Action.IDLE; return Action.IDLE;
@ -545,7 +561,7 @@ public abstract class CompressExtension extends AbstractExtension
{ {
// Fail all the frames in the queue. // Fail all the frames in the queue.
FrameEntry entry; FrameEntry entry;
while ((entry = entries.poll()) != null) while ((entry = pollEntry()) != null)
notifyCallbackFailure(entry.callback,x); notifyCallbackFailure(entry.callback,x);
} }

View File

@ -20,9 +20,9 @@ package org.eclipse.jetty.websocket.common.extensions.fragment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -41,7 +41,7 @@ public class FragmentExtension extends AbstractExtension
{ {
private static final Logger LOG = Log.getLogger(FragmentExtension.class); private static final Logger LOG = Log.getLogger(FragmentExtension.class);
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private int maxLength; private int maxLength;
@ -71,7 +71,7 @@ public class FragmentExtension extends AbstractExtension
FrameEntry entry = new FrameEntry(frame, callback, batchMode); FrameEntry entry = new FrameEntry(frame, callback, batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}", entry); LOG.debug("Queuing {}", entry);
entries.offer(entry); offerEntry(entry);
flusher.iterate(); flusher.iterate();
} }
@ -82,6 +82,22 @@ public class FragmentExtension extends AbstractExtension
maxLength = config.getParameter("maxLength", -1); maxLength = config.getParameter("maxLength", -1);
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private static class FrameEntry private static class FrameEntry
{ {
private final Frame frame; private final Frame frame;
@ -112,7 +128,7 @@ public class FragmentExtension extends AbstractExtension
{ {
if (finished) if (finished)
{ {
current = entries.poll(); current = pollEntry();
LOG.debug("Processing {}", current); LOG.debug("Processing {}", current);
if (current == null) if (current == null)
return Action.IDLE; return Action.IDLE;