From 0f97c3df5d486dd4a732996228e7a6c8d6425af0 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 28 Sep 2016 11:39:21 +0200 Subject: [PATCH] Fixes #966 - Remove usages of ConcurrentArrayQueue. --- .../eclipse/jetty/fcgi/generator/Flusher.java | 28 +++-- .../eclipse/jetty/http2/HTTP2Connection.java | 26 ++++- .../http2/server/HTTP2ServerConnection.java | 24 ++++- .../org/eclipse/jetty/io/ByteBufferPool.java | 101 ++++++++++++++---- .../eclipse/jetty/server/HttpInputTest.java | 14 +-- .../ssl/SniSslConnectionFactoryTest.java | 4 +- .../server/ssl/SslConnectionFactoryTest.java | 4 +- .../strategy/ExecuteProduceConsumeTest.java | 7 +- .../common/extensions/ExtensionStack.java | 34 +++++- .../compress/CompressExtension.java | 26 ++++- .../fragment/FragmentExtension.java | 24 ++++- 11 files changed, 225 insertions(+), 67 deletions(-) diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java index 51d2b83a45c..d5f9b454a5f 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java @@ -19,10 +19,10 @@ package org.eclipse.jetty.fcgi.generator; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Queue; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -31,7 +31,7 @@ public class Flusher { private static final Logger LOG = Log.getLogger(Flusher.class); - private final Queue queue = new ConcurrentArrayQueue<>(); + private final Queue queue = new ArrayDeque<>(); private final IteratingCallback flushCallback = new FlushCallback(); private final EndPoint endPoint; @@ -43,10 +43,26 @@ public class Flusher public void flush(Generator.Result... results) { for (Generator.Result result : results) - queue.offer(result); + offer(result); flushCallback.iterate(); } + private void offer(Generator.Result result) + { + synchronized (this) + { + queue.offer(result); + } + } + + private Generator.Result poll() + { + synchronized (this) + { + return queue.poll(); + } + } + public void shutdown() { flush(new ShutdownResult()); @@ -60,7 +76,7 @@ public class Flusher protected Action process() throws Exception { // Look if other writes are needed. - Generator.Result result = queue.poll(); + Generator.Result result = poll(); if (result == null) { // No more writes to do, return. @@ -71,7 +87,7 @@ public class Flusher // Most often there is another result in the // queue so this is a real optimization because // it sends both results in just one TCP packet. - Generator.Result other = queue.poll(); + Generator.Result other = poll(); if (other != null) result = result.join(other); @@ -106,7 +122,7 @@ public class Flusher while (true) { - Generator.Result result = queue.poll(); + Generator.Result result = poll(); if (result == null) break; result.failed(x); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 9f0a2b82d6c..68e9ef52b18 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.http2; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; @@ -30,7 +31,6 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; @@ -39,7 +39,7 @@ public class HTTP2Connection extends AbstractConnection { protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); - private final Queue tasks = new ConcurrentArrayQueue<>(); + private final Queue tasks = new ArrayDeque<>(); private final HTTP2Producer producer = new HTTP2Producer(); private final AtomicLong bytesIn = new AtomicLong(); private final ByteBufferPool byteBufferPool; @@ -140,7 +140,7 @@ public class HTTP2Connection extends AbstractConnection protected void offerTask(Runnable task, boolean dispatch) { - tasks.offer(task); + offerTask(task); if (dispatch) executionStrategy.dispatch(); else @@ -155,6 +155,22 @@ public class HTTP2Connection extends AbstractConnection session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); } + private void offerTask(Runnable task) + { + synchronized (this) + { + tasks.offer(task); + } + } + + private Runnable pollTask() + { + synchronized (this) + { + return tasks.poll(); + } + } + protected class HTTP2Producer implements ExecutionStrategy.Producer { private final Callback fillCallback = new FillCallback(); @@ -163,7 +179,7 @@ public class HTTP2Connection extends AbstractConnection @Override public Runnable produce() { - Runnable task = tasks.poll(); + Runnable task = pollTask(); if (LOG.isDebugEnabled()) LOG.debug("Dequeued task {}", task); if (task != null) @@ -182,7 +198,7 @@ public class HTTP2Connection extends AbstractConnection while (buffer.hasRemaining()) parser.parse(buffer); - task = tasks.poll(); + task = pollTask(); if (LOG.isDebugEnabled()) LOG.debug("Dequeued new task {}", task); if (task != null) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index 4dcb2edc252..1711782fa11 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2.server; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -53,13 +54,12 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.thread.ExecutionStrategy; public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo { - private final Queue channels = new ConcurrentArrayQueue<>(); + private final Queue channels = new ArrayDeque<>(); private final List upgradeFrames = new ArrayList<>(); private final AtomicLong totalRequests = new AtomicLong(); private final AtomicLong totalResponses = new AtomicLong(); @@ -197,7 +197,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream) { - HttpChannelOverHTTP2 channel = channels.poll(); + HttpChannelOverHTTP2 channel = pollChannel(); if (channel != null) { channel.getHttpTransport().setStream(stream); @@ -216,6 +216,22 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return channel; } + private void offerChannel(HttpChannelOverHTTP2 channel) + { + synchronized (this) + { + channels.offer(channel); + } + } + + private HttpChannelOverHTTP2 pollChannel() + { + synchronized (this) + { + return channels.poll(); + } + } + public boolean upgrade(Request request) { if (HttpMethod.PRI.is(request.getMethod())) @@ -278,7 +294,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection { getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE); super.recycle(); - channels.offer(this); + offerChannel(this); } @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index e5c1603f136..257862d1ea5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -19,13 +19,15 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.ConcurrentArrayQueue; /** *

A {@link ByteBuffer} pool.

@@ -126,10 +128,11 @@ public interface ByteBufferPool class Bucket { + private final Lock _lock = new ReentrantLock(); + private final Queue _queue = new ArrayDeque<>(); private final ByteBufferPool _pool; private final int _capacity; private final AtomicInteger _space; - private final Queue _queue = new ConcurrentArrayQueue<>(); public Bucket(ByteBufferPool pool, int bufferSize, int maxSize) { @@ -138,20 +141,9 @@ public interface ByteBufferPool _space = maxSize > 0 ? new AtomicInteger(maxSize) : null; } - public void release(ByteBuffer buffer) - { - BufferUtil.clear(buffer); - if (_space == null) - _queue.offer(buffer); - else if (_space.decrementAndGet() >= 0) - _queue.offer(buffer); - else - _space.incrementAndGet(); - } - public ByteBuffer acquire(boolean direct) { - ByteBuffer buffer = _queue.poll(); + ByteBuffer buffer = queuePoll(); if (buffer == null) return _pool.newByteBuffer(_capacity, direct); if (_space != null) @@ -159,37 +151,108 @@ public interface ByteBufferPool return buffer; } + public void release(ByteBuffer buffer) + { + BufferUtil.clear(buffer); + if (_space == null) + queueOffer(buffer); + else if (_space.decrementAndGet() >= 0) + queueOffer(buffer); + else + _space.incrementAndGet(); + } + public void clear() { if (_space == null) { - _queue.clear(); + queueClear(); } else { int s = _space.getAndSet(0); while (s-- > 0) { - if (_queue.poll() == null) + if (queuePoll() == null) _space.incrementAndGet(); } } } + private void queueOffer(ByteBuffer buffer) + { + Lock lock = _lock; + lock.lock(); + try + { + _queue.offer(buffer); + } + finally + { + lock.unlock(); + } + } + + private ByteBuffer queuePoll() + { + Lock lock = _lock; + lock.lock(); + try + { + return _queue.poll(); + } + finally + { + lock.unlock(); + } + } + + private void queueClear() + { + Lock lock = _lock; + lock.lock(); + try + { + _queue.clear(); + } + finally + { + lock.unlock(); + } + } + boolean isEmpty() { - return _queue.isEmpty(); + Lock lock = _lock; + lock.lock(); + try + { + return _queue.isEmpty(); + } + finally + { + lock.unlock(); + } } int size() { - return _queue.size(); + Lock lock = _lock; + lock.lock(); + try + { + return _queue.size(); + } + finally + { + lock.unlock(); + } } @Override public String toString() { - return String.format("Bucket@%x{%d,%d}", hashCode(), _capacity, _queue.size()); + return String.format("Bucket@%x{%d/%d}", hashCode(), size(), _capacity); } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java index c930095df9b..dde2f0b643e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java @@ -21,12 +21,12 @@ package org.eclipse.jetty.server; import java.io.EOFException; import java.io.IOException; import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; import javax.servlet.ReadListener; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; @@ -35,16 +35,8 @@ import org.junit.Test; public class HttpInputTest { - private final Queue _history = new ConcurrentArrayQueue() - { - @Override - public boolean add(String s) - { - //System.err.println("history: "+s); - return super.add(s); - } - }; - private final Queue _fillAndParseSimulate = new ConcurrentArrayQueue<>(); + private final Queue _history = new LinkedBlockingQueue<>(); + private final Queue _fillAndParseSimulate = new LinkedBlockingQueue<>(); private final ReadListener _listener = new ReadListener() { @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SniSslConnectionFactoryTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SniSslConnectionFactoryTest.java index 5543ab73024..90af4d268ca 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SniSslConnectionFactoryTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SniSslConnectionFactoryTest.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIServerName; @@ -51,7 +52,6 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SocketCustomizationListener; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -363,7 +363,7 @@ public class SniSslConnectionFactoryTest @Test public void testSocketCustomization() throws Exception { - final Queue history = new ConcurrentArrayQueue<>(); + final Queue history = new LinkedBlockingQueue<>(); _connector.addBean(new SocketCustomizationListener() { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SslConnectionFactoryTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SslConnectionFactoryTest.java index e0a87fa1695..797ae892c91 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SslConnectionFactoryTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SslConnectionFactoryTest.java @@ -28,6 +28,7 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIServerName; @@ -49,7 +50,6 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SocketCustomizationListener; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.hamcrest.Matchers; @@ -196,7 +196,7 @@ public class SslConnectionFactoryTest @Test public void testSocketCustomization() throws Exception { - final Queue history = new ConcurrentArrayQueue<>(); + final Queue history = new LinkedBlockingQueue<>(); _connector.addBean(new SocketCustomizationListener() { diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java index f50121cb84f..2f5acb6fef1 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java @@ -23,9 +23,8 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; -import org.eclipse.jetty.util.BlockingArrayQueue; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer; import org.hamcrest.Matchers; import org.junit.After; @@ -37,8 +36,8 @@ public class ExecuteProduceConsumeTest { private static final Runnable NULLTASK = () -> {}; - private final BlockingQueue _produce = new BlockingArrayQueue<>(); - private final Queue _executions = new ConcurrentArrayQueue<>(); + private final BlockingQueue _produce = new LinkedBlockingQueue<>(); + private final Queue _executions = new LinkedBlockingQueue<>(); private ExecuteProduceConsume _ewyk; private volatile Thread _producer; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java index 2cde3a5ab35..e25da23f0d0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/ExtensionStack.java @@ -19,12 +19,12 @@ package org.eclipse.jetty.websocket.common.extensions; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import java.util.Queue; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -52,7 +52,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames { private static final Logger LOG = Log.getLogger(ExtensionStack.class); - private final Queue entries = new ConcurrentArrayQueue<>(); + private final Queue entries = new ArrayDeque<>(); private final IteratingCallback flusher = new Flusher(); private final ExtensionFactory factory; private List extensions; @@ -292,7 +292,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames FrameEntry entry = new FrameEntry(frame,callback,batchMode); if (LOG.isDebugEnabled()) LOG.debug("Queuing {}",entry); - entries.offer(entry); + offerEntry(entry); flusher.iterate(); } @@ -317,12 +317,36 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames } } + private void offerEntry(FrameEntry entry) + { + synchronized (this) + { + entries.offer(entry); + } + } + + private FrameEntry pollEntry() + { + synchronized (this) + { + return entries.poll(); + } + } + + private int getQueueSize() + { + synchronized (this) + { + return entries.size(); + } + } + @Override public String toString() { StringBuilder s = new StringBuilder(); s.append("ExtensionStack["); - s.append("queueSize=").append(entries.size()); + s.append("queueSize=").append(getQueueSize()); s.append(",extensions="); if (extensions == null) { @@ -383,7 +407,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames @Override protected Action process() throws Exception { - current = entries.poll(); + current = pollEntry(); if (current == null) { if (LOG.isDebugEnabled()) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index a2b5b26e4c0..12c80dd2c72 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions.compress; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; @@ -28,7 +29,6 @@ import java.util.zip.Inflater; import java.util.zip.ZipException; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -72,7 +72,7 @@ public abstract class CompressExtension extends AbstractExtension private final static boolean NOWRAP = true; - private final Queue entries = new ConcurrentArrayQueue<>(); + private final Queue entries = new ArrayDeque<>(); private final IteratingCallback flusher = new Flusher(); private Deflater deflaterImpl; private Inflater inflaterImpl; @@ -214,10 +214,26 @@ public abstract class CompressExtension extends AbstractExtension FrameEntry entry = new FrameEntry(frame,callback,batchMode); if (LOG.isDebugEnabled()) LOG.debug("Queuing {}",entry); - entries.offer(entry); + offerEntry(entry); flusher.iterate(); } + private void offerEntry(FrameEntry entry) + { + synchronized (this) + { + entries.offer(entry); + } + } + + private FrameEntry pollEntry() + { + synchronized (this) + { + return entries.poll(); + } + } + protected void notifyCallbackSuccess(WriteCallback callback) { try @@ -408,7 +424,7 @@ public abstract class CompressExtension extends AbstractExtension { if (finished) { - current = entries.poll(); + current = pollEntry(); LOG.debug("Processing {}",current); if (current == null) return Action.IDLE; @@ -545,7 +561,7 @@ public abstract class CompressExtension extends AbstractExtension { // Fail all the frames in the queue. FrameEntry entry; - while ((entry = entries.poll()) != null) + while ((entry = pollEntry()) != null) notifyCallbackFailure(entry.callback,x); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java index 1d29c13d9de..292493c715c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java @@ -20,9 +20,9 @@ package org.eclipse.jetty.websocket.common.extensions.fragment; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Queue; -import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -41,7 +41,7 @@ public class FragmentExtension extends AbstractExtension { private static final Logger LOG = Log.getLogger(FragmentExtension.class); - private final Queue entries = new ConcurrentArrayQueue<>(); + private final Queue entries = new ArrayDeque<>(); private final IteratingCallback flusher = new Flusher(); private int maxLength; @@ -71,7 +71,7 @@ public class FragmentExtension extends AbstractExtension FrameEntry entry = new FrameEntry(frame, callback, batchMode); if (LOG.isDebugEnabled()) LOG.debug("Queuing {}", entry); - entries.offer(entry); + offerEntry(entry); flusher.iterate(); } @@ -82,6 +82,22 @@ public class FragmentExtension extends AbstractExtension maxLength = config.getParameter("maxLength", -1); } + private void offerEntry(FrameEntry entry) + { + synchronized (this) + { + entries.offer(entry); + } + } + + private FrameEntry pollEntry() + { + synchronized (this) + { + return entries.poll(); + } + } + private static class FrameEntry { private final Frame frame; @@ -112,7 +128,7 @@ public class FragmentExtension extends AbstractExtension { if (finished) { - current = entries.poll(); + current = pollEntry(); LOG.debug("Processing {}", current); if (current == null) return Action.IDLE;