Issue #751 - Remove usages of ArrayQueue.

This commit is contained in:
Simone Bordet 2016-07-21 17:55:48 +02:00
parent 42b16e9785
commit 6a15bbfb2b
5 changed files with 20 additions and 22 deletions

View File

@ -22,7 +22,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -35,7 +37,6 @@ import org.eclipse.jetty.client.Synchronizable;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -90,7 +91,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private final Object lock = this;
private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
private final Deque<Chunk> chunks = new ArrayDeque<>();
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
@ -259,7 +260,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
{
// Slow path: reinsert the CLOSE chunk
// so that hasNext() works correctly.
chunks.add(0, CLOSE);
chunks.offerFirst(CLOSE);
throw new NoSuchElementException();
}
return chunk == null ? null : chunk.buffer;

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,7 +53,6 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -156,7 +156,7 @@ public class PrefaceTest extends AbstractTest
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[buffers.size()]));
Queue<SettingsFrame> settings = new ArrayQueue<>();
Queue<SettingsFrame> settings = new ArrayDeque<>();
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
@ -42,7 +41,7 @@ public class HTTP2Flusher extends IteratingCallback
private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
private final Queue<WindowEntry> windows = new ArrayDeque<>();
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
private final List<Entry> frames = new ArrayList<>();
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> resets = new ArrayList<>();
private final List<Entry> actives = new ArrayList<>();
@ -96,7 +95,7 @@ public class HTTP2Flusher extends IteratingCallback
closed = terminated;
if (!closed)
{
frames.offer(entry);
frames.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, frames={}", entry, frames.size());
}
@ -110,10 +109,7 @@ public class HTTP2Flusher extends IteratingCallback
{
synchronized (this)
{
if (index == 0)
return frames.pollUnsafe();
else
return frames.remove(index);
return frames.remove(index);
}
}

View File

@ -25,11 +25,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -58,7 +58,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
private final Locker _locker = new Locker();
private final Condition _hasOutput = _locker.newCondition();
private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
private final Queue<ByteBuffer> _inQ = new ArrayDeque<>();
private ByteBuffer _out;
private boolean _ishut;
private boolean _oshut;
@ -268,7 +268,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
getWriteFlusher().completeWrite();
return b;
}
/* ------------------------------------------------------------ */
/** Wait for some output
* @param time Time to wait
@ -580,7 +580,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
String o;
try(Locker.Lock lock = _locker.lock())
{
q=_inQ.size();
q=_inQ.size();
b=_inQ.peek();
o=BufferUtil.toDetailString(_out);
}

View File

@ -20,14 +20,15 @@ package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
@ -163,7 +164,7 @@ public class FrameFlusher
{
while ((entries.size() <= maxGather) && !queue.isEmpty())
{
FrameEntry entry = queue.remove(0);
FrameEntry entry = queue.poll();
currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
// Force flush if we need to.
@ -293,7 +294,7 @@ public class FrameFlusher
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
private final Deque<FrameEntry> queue = new ArrayDeque<>();
private final Flusher flusher;
private final AtomicBoolean closed = new AtomicBoolean();
private volatile Throwable failure;
@ -353,7 +354,7 @@ public class FrameFlusher
case OpCode.PING:
{
// Prepend PINGs so they are processed first.
queue.add(0,entry);
queue.offerFirst(entry);
break;
}
case OpCode.CLOSE:
@ -362,12 +363,12 @@ public class FrameFlusher
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.add(entry);
queue.offer(entry);
break;
}
default:
{
queue.add(entry);
queue.offer(entry);
break;
}
}