Implemented buffer pooling.

This commit is contained in:
Simone Bordet 2012-03-05 15:52:25 +01:00
parent 142a1058ba
commit 194dcb7bc9
44 changed files with 396 additions and 242 deletions

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
/**
* <p>A {@link ByteBuffer} pool.</p>
* <p>Acquired buffers may be {@link #release(ByteBuffer) released} but they do not need to;
* if they are released, they may be recycled and reused, otherwise they will be garbage
* collected as usual.</p>
*/
public interface ByteBufferPool
{
/**
* <p>Requests a {@link ByteBuffer} of the given size.</p>
* <p>The returned buffer may have a bigger capacity than the size being
* requested but it will have the limit set to the given size.</p>
*
* @param size the size of the buffer
* @param direct whether the buffer must be direct or not
* @return the requested buffer
* @see #release(ByteBuffer)
*/
public ByteBuffer acquire(int size, boolean direct);
/**
* <p>Returns a {@link ByteBuffer}, usually obtained with {@link #acquire(int, boolean)}
* (but not necessarily), making it available for recycling and reuse.</p>
*
* @param buffer the buffer to return
* @see #acquire(int, boolean)
*/
public void release(ByteBuffer buffer);
}

View File

@ -0,0 +1,99 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class StandardByteBufferPool implements ByteBufferPool
{
private final ConcurrentMap<Integer, Queue<ByteBuffer>> directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Queue<ByteBuffer>> heapBuffers = new ConcurrentHashMap<>();
private final int factor;
public StandardByteBufferPool()
{
this(1024);
}
public StandardByteBufferPool(int factor)
{
this.factor = factor;
}
public ByteBuffer acquire(int size, boolean direct)
{
int bucket = bucketFor(size);
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(direct);
ByteBuffer result = null;
Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
if (byteBuffers != null)
result = byteBuffers.poll();
if (result == null)
{
int capacity = bucket * factor;
result = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
}
result.clear();
result.limit(size);
return result;
}
public void release(ByteBuffer buffer)
{
// int bucket = bucketFor(buffer.capacity());
// ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect());
//
// // Avoid to create a new queue every time, just to be discarded immediately
// Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
// if (byteBuffers == null)
// {
// byteBuffers = new ConcurrentLinkedQueue<>();
// Queue<ByteBuffer> existing = buffers.putIfAbsent(bucket, byteBuffers);
// if (existing != null)
// byteBuffers = existing;
// }
//
// buffer.clear();
// byteBuffers.offer(buffer);
}
public void clear()
{
directBuffers.clear();
heapBuffers.clear();
}
private int bucketFor(int size)
{
int bucket = size / factor;
if (size % factor > 0)
++bucket;
return bucket;
}
private ConcurrentMap<Integer, Queue<ByteBuffer>> buffersFor(boolean direct)
{
return direct ? directBuffers : heapBuffers;
}
}

View File

@ -79,6 +79,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final Deque<FrameBytes> queue = new LinkedList<>();
private final ByteBufferPool bufferPool;
private final Executor threadPool;
private final ScheduledExecutorService scheduler;
private final short version;
@ -94,9 +95,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private boolean flushing;
private volatile int windowSize = 65536;
public StandardSession(short version, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator)
public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator)
{
this.version = version;
this.bufferPool = bufferPool;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.controller = controller;
@ -380,7 +382,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void onSyn(final SynStreamFrame frame)
{
final IStream stream = new StandardStream(this, frame);
final IStream stream = newStream(frame);
logger.debug("Opening {}", stream);
int streamId = frame.getStreamId();
IStream existing = streams.putIfAbsent(streamId, stream);
@ -411,7 +413,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private IStream createStream(SynStreamFrame synStream, StreamFrameListener listener)
{
IStream stream = new StandardStream(this, synStream);
IStream stream = newStream(synStream);
stream.setStreamFrameListener(listener);
if (streams.putIfAbsent(synStream.getStreamId(), stream) != null)
{
@ -426,6 +428,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return stream;
}
private IStream newStream(SynStreamFrame frame)
{
return new StandardStream(frame, this, windowSize);
}
private void notifyStreamCreated(IStream stream)
{
for (Listener listener : listeners)
@ -937,13 +944,16 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void complete()
{
bufferPool.release(buffer);
super.complete();
if (frame.getType() == ControlFrameType.GO_AWAY)
{
// After sending a GO_AWAY we need to hard close the connection.
// Recipients will know the last good stream id and act accordingly.
close();
}
super.complete();
}
@Override
@ -956,14 +966,15 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private class DataFrameBytes<C> extends AbstractFrameBytes<C>
{
private final IStream stream;
private final DataInfo data;
private int dataLength;
private final DataInfo dataInfo;
private int length;
private ByteBuffer buffer;
private DataFrameBytes(Handler<C> handler, C context, IStream stream, DataInfo data)
private DataFrameBytes(Handler<C> handler, C context, IStream stream, DataInfo dataInfo)
{
super(handler, context);
this.stream = stream;
this.data = data;
this.dataInfo = dataInfo;
}
@Override
@ -975,9 +986,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
if (windowSize <= 0)
return null;
ByteBuffer buffer = generator.data(stream.getId(), windowSize, data);
dataLength = buffer.remaining() - DataFrame.HEADER_LENGTH;
length = dataInfo.length();
if (length > windowSize)
length = windowSize;
buffer = generator.data(stream.getId(), length, dataInfo);
return buffer;
}
catch (Throwable x)
@ -990,9 +1003,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void complete()
{
stream.updateWindowSize(-dataLength);
stream.updateWindowSize(-length);
bufferPool.release(buffer);
if (data.available() > 0)
if (dataInfo.available() > 0)
{
// If we could not write a full data frame, then we need first
// to finish it, and then process the others (to avoid data garbling)
@ -1000,17 +1014,17 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
else
{
stream.updateCloseState(data.isClose());
super.complete();
stream.updateCloseState(dataInfo.isClose());
if (stream.isClosed())
removeStream(stream);
super.complete();
}
}
@Override
public String toString()
{
return String.format("DATA bytes @%x available=%d consumed=%d on %s", data.hashCode(), data.available(), data.consumed(), stream);
return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), stream);
}
}
}

View File

@ -17,9 +17,7 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -48,21 +46,19 @@ public class StandardStream implements IStream
{
private static final Logger logger = LoggerFactory.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final Queue<Runnable> queue = new LinkedList<>();
private final ISession session;
private final SynStreamFrame frame;
private final ISession session;
private final AtomicInteger windowSize;
private volatile StreamFrameListener listener;
private volatile boolean opened;
private volatile boolean halfClosed;
private volatile boolean closed;
private boolean dispatched;
public StandardStream(ISession session, SynStreamFrame frame)
public StandardStream(SynStreamFrame frame, ISession session, int windowSize)
{
this.session = session;
this.frame = frame;
this.windowSize = new AtomicInteger(session.getWindowSize());
this.session = session;
this.windowSize = new AtomicInteger(windowSize);
this.halfClosed = frame.isClose();
}

View File

@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* expected.</p>
* <p>Receivers of {@link DataInfo} via {@link StreamFrameListener#onData(Stream, DataInfo)}
* have two different APIs to read the data content bytes: a {@link #readInto(ByteBuffer) read}
* API that does not interact with flow control, and a {@link #drainInto(ByteBuffer) drain}
* API that does not interact with flow control, and a {@link #consumeInto(ByteBuffer) drain}
* API that interacts with flow control.</p>
* <p>Flow control is defined so that when the sender wants to sends a number of bytes larger
* than the {@link Settings.ID#INITIAL_WINDOW_SIZE} value, it will stop sending as soon as it
* has sent a number of bytes equal to the window size. The receiver has to <em>consume</em>
* the data bytes that it received in order to tell the sender to send more bytes.</p>
* <p>Consuming the data bytes can be done only via {@link #drainInto(ByteBuffer)} or by a combination
* <p>Consuming the data bytes can be done only via {@link #consumeInto(ByteBuffer)} or by a combination
* of {@link #readInto(ByteBuffer)} and {@link #consume(int)} (possibly at different times).</p>
*/
public abstract class DataInfo
@ -155,7 +155,7 @@ public abstract class DataInfo
* @param output the {@link ByteBuffer} to copy to bytes into
* @return the number of bytes copied
* @see #available()
* @see #drainInto(ByteBuffer)
* @see #consumeInto(ByteBuffer)
*/
public abstract int readInto(ByteBuffer output);
@ -166,7 +166,7 @@ public abstract class DataInfo
* @return the number of bytes copied
* @see #consume(int)
*/
public int drainInto(ByteBuffer output)
public int consumeInto(ByteBuffer output)
{
int read = readInto(output);
consume(read);
@ -184,8 +184,8 @@ public abstract class DataInfo
throw new IllegalArgumentException();
int read = length() - available();
int newConsumed = consumed() + delta;
if (newConsumed > read)
throw new IllegalStateException("Consuming without reading: consumed " + newConsumed + " but only read " + read);
// if (newConsumed > read)
// throw new IllegalStateException("Consuming without reading: consumed " + newConsumed + " but only read " + read);
consumed.addAndGet(delta);
}
@ -227,15 +227,20 @@ public abstract class DataInfo
*/
public ByteBuffer asByteBuffer(boolean consume)
{
ByteBuffer buffer = ByteBuffer.allocate(available());
ByteBuffer buffer = allocate(available());
if (consume)
drainInto(buffer);
consumeInto(buffer);
else
readInto(buffer);
buffer.flip();
return buffer;
}
protected ByteBuffer allocate(int size)
{
return ByteBuffer.allocate(size);
}
@Override
public String toString()
{

View File

@ -49,7 +49,9 @@ public interface StreamFrameListener extends EventListener
public void onHeaders(Stream stream, HeadersInfo headersInfo);
/**
* <p>Callback invoked when data are received on a stream.</p>
* <p>Callback invoked when data bytes are received on a stream.</p>
* <p>Implementers should be read or consume the content of the
* {@link DataInfo} before this method returns.</p>
*
* @param stream the stream
* @param dataInfo the data metadata

View File

@ -18,10 +18,23 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
public abstract class ControlFrameGenerator
{
private final ByteBufferPool bufferPool;
protected ControlFrameGenerator(ByteBufferPool bufferPool)
{
this.bufferPool = bufferPool;
}
protected ByteBufferPool getByteBufferPool()
{
return bufferPool;
}
public abstract ByteBuffer generate(ControlFrame frame);
protected void generateControlFrameHeader(ControlFrame frame, int frameLength, ByteBuffer buffer)

View File

@ -18,18 +18,22 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;
public class DataFrameGenerator
{
public ByteBuffer generate(int streamId, int windowSize, DataInfo dataInfo)
private final ByteBufferPool bufferPool;
public DataFrameGenerator(ByteBufferPool bufferPool)
{
// TODO: use buffer pool
int size = dataInfo.length();
if (size > windowSize)
size = windowSize;
ByteBuffer buffer = ByteBuffer.allocateDirect(DataFrame.HEADER_LENGTH + size);
this.bufferPool = bufferPool;
}
public ByteBuffer generate(int streamId, int length, DataInfo dataInfo)
{
ByteBuffer buffer = bufferPool.acquire(DataFrame.HEADER_LENGTH + length, true);
buffer.position(DataFrame.HEADER_LENGTH);
// Guaranteed to always be >= 0
int read = dataInfo.readInto(buffer);

View File

@ -19,6 +19,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import java.util.EnumMap;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.CompressionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
@ -29,20 +30,20 @@ public class Generator
private final EnumMap<ControlFrameType, ControlFrameGenerator> generators = new EnumMap<>(ControlFrameType.class);
private final DataFrameGenerator dataFrameGenerator;
public Generator(CompressionFactory.Compressor compressor)
public Generator(ByteBufferPool bufferPool, CompressionFactory.Compressor compressor)
{
HeadersBlockGenerator headersBlockGenerator = new HeadersBlockGenerator(compressor);
generators.put(ControlFrameType.SYN_STREAM, new SynStreamGenerator(headersBlockGenerator));
generators.put(ControlFrameType.SYN_REPLY, new SynReplyGenerator(headersBlockGenerator));
generators.put(ControlFrameType.RST_STREAM, new RstStreamGenerator());
generators.put(ControlFrameType.SETTINGS, new SettingsGenerator());
generators.put(ControlFrameType.NOOP, new NoOpGenerator());
generators.put(ControlFrameType.PING, new PingGenerator());
generators.put(ControlFrameType.GO_AWAY, new GoAwayGenerator());
generators.put(ControlFrameType.HEADERS, new HeadersGenerator(headersBlockGenerator));
generators.put(ControlFrameType.WINDOW_UPDATE, new WindowUpdateGenerator());
generators.put(ControlFrameType.SYN_STREAM, new SynStreamGenerator(bufferPool, headersBlockGenerator));
generators.put(ControlFrameType.SYN_REPLY, new SynReplyGenerator(bufferPool, headersBlockGenerator));
generators.put(ControlFrameType.RST_STREAM, new RstStreamGenerator(bufferPool));
generators.put(ControlFrameType.SETTINGS, new SettingsGenerator(bufferPool));
generators.put(ControlFrameType.NOOP, new NoOpGenerator(bufferPool));
generators.put(ControlFrameType.PING, new PingGenerator(bufferPool));
generators.put(ControlFrameType.GO_AWAY, new GoAwayGenerator(bufferPool));
generators.put(ControlFrameType.HEADERS, new HeadersGenerator(bufferPool, headersBlockGenerator));
generators.put(ControlFrameType.WINDOW_UPDATE, new WindowUpdateGenerator(bufferPool));
dataFrameGenerator = new DataFrameGenerator();
dataFrameGenerator = new DataFrameGenerator(bufferPool);
}
public ByteBuffer control(ControlFrame frame)
@ -51,8 +52,8 @@ public class Generator
return generator.generate(frame);
}
public ByteBuffer data(int streamId, int windowSize, DataInfo dataInfo)
public ByteBuffer data(int streamId, int length, DataInfo dataInfo)
{
return dataFrameGenerator.generate(streamId, windowSize, dataInfo);
return dataFrameGenerator.generate(streamId, length, dataInfo);
}
}

View File

@ -18,12 +18,18 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
public class GoAwayGenerator extends ControlFrameGenerator
{
public GoAwayGenerator(ByteBufferPool bufferPool)
{
super(bufferPool);
}
@Override
public ByteBuffer generate(ControlFrame frame)
{
@ -31,7 +37,7 @@ public class GoAwayGenerator extends ControlFrameGenerator
int frameBodyLength = 8;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(goAway, frameBodyLength, buffer);
buffer.putInt(goAway.getLastStreamId() & 0x7F_FF_FF_FF);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.frames.ControlFrame;
@ -27,8 +28,9 @@ public class HeadersGenerator extends ControlFrameGenerator
{
private final HeadersBlockGenerator headersBlockGenerator;
public HeadersGenerator(HeadersBlockGenerator headersBlockGenerator)
public HeadersGenerator(ByteBufferPool bufferPool, HeadersBlockGenerator headersBlockGenerator)
{
super(bufferPool);
this.headersBlockGenerator = headersBlockGenerator;
}
@ -52,7 +54,7 @@ public class HeadersGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(headers, frameLength, buffer);
buffer.putInt(headers.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -18,11 +18,17 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.NoOpFrame;
public class NoOpGenerator extends ControlFrameGenerator
{
public NoOpGenerator(ByteBufferPool bufferPool)
{
super(bufferPool);
}
@Override
public ByteBuffer generate(ControlFrame frame)
{
@ -30,7 +36,7 @@ public class NoOpGenerator extends ControlFrameGenerator
int frameBodyLength = 0;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(noOp, frameBodyLength, buffer);
buffer.flip();

View File

@ -18,11 +18,17 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.PingFrame;
public class PingGenerator extends ControlFrameGenerator
{
public PingGenerator(ByteBufferPool bufferPool)
{
super(bufferPool);
}
@Override
public ByteBuffer generate(ControlFrame frame)
{
@ -30,7 +36,7 @@ public class PingGenerator extends ControlFrameGenerator
int frameBodyLength = 4;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(ping, frameBodyLength, buffer);
buffer.putInt(ping.getPingId());

View File

@ -18,11 +18,17 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;
public class RstStreamGenerator extends ControlFrameGenerator
{
public RstStreamGenerator(ByteBufferPool bufferPool)
{
super(bufferPool);
}
@Override
public ByteBuffer generate(ControlFrame frame)
{
@ -30,7 +36,7 @@ public class RstStreamGenerator extends ControlFrameGenerator
int frameBodyLength = 8;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(rstStream, frameBodyLength, buffer);
buffer.putInt(rstStream.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Settings;
import org.eclipse.jetty.spdy.frames.ControlFrame;
@ -25,6 +26,11 @@ import org.eclipse.jetty.spdy.frames.SettingsFrame;
public class SettingsGenerator extends ControlFrameGenerator
{
public SettingsGenerator(ByteBufferPool bufferPool)
{
super(bufferPool);
}
@Override
public ByteBuffer generate(ControlFrame frame)
{
@ -34,7 +40,7 @@ public class SettingsGenerator extends ControlFrameGenerator
int size = settings.size();
int frameBodyLength = 4 + 8 * size;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(settingsFrame, frameBodyLength, buffer);
buffer.putInt(size);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SessionStatus;
@ -28,8 +29,9 @@ public class SynReplyGenerator extends ControlFrameGenerator
{
private final HeadersBlockGenerator headersBlockGenerator;
public SynReplyGenerator(HeadersBlockGenerator headersBlockGenerator)
public SynReplyGenerator(ByteBufferPool bufferPool, HeadersBlockGenerator headersBlockGenerator)
{
super(bufferPool);
this.headersBlockGenerator = headersBlockGenerator;
}
@ -53,7 +55,7 @@ public class SynReplyGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(synReply, frameLength, buffer);
buffer.putInt(synReply.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.SPDY;
@ -30,8 +31,9 @@ public class SynStreamGenerator extends ControlFrameGenerator
{
private final HeadersBlockGenerator headersBlockGenerator;
public SynStreamGenerator(HeadersBlockGenerator headersBlockGenerator)
public SynStreamGenerator(ByteBufferPool bufferPool, HeadersBlockGenerator headersBlockGenerator)
{
super(bufferPool);
this.headersBlockGenerator = headersBlockGenerator;
}
@ -55,7 +57,7 @@ public class SynStreamGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(synStream, frameLength, buffer);
int streamId = synStream.getStreamId();

View File

@ -18,11 +18,17 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
public class WindowUpdateGenerator extends ControlFrameGenerator
{
public WindowUpdateGenerator(ByteBufferPool bufferPool)
{
super(bufferPool);
}
@Override
public ByteBuffer generate(ControlFrame frame)
{
@ -30,7 +36,7 @@ public class WindowUpdateGenerator extends ControlFrameGenerator
int frameBodyLength = 8;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
generateControlFrameHeader(windowUpdate, frameBodyLength, buffer);
buffer.putInt(windowUpdate.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -101,14 +101,14 @@ public abstract class DataFrameParser
length -= size;
if (length == 0)
{
onData(bytes);
onDataFrame(bytes);
return true;
}
else
{
// We got only part of the frame data bytes,
// so we generate a synthetic data frame
onSyntheticData(bytes);
onDataFragment(bytes);
}
break;
}
@ -121,14 +121,14 @@ public abstract class DataFrameParser
return false;
}
private void onData(ByteBuffer bytes)
private void onDataFrame(ByteBuffer bytes)
{
DataFrame frame = new DataFrame(streamId, flags, bytes.remaining());
onDataFrame(frame, bytes);
reset();
}
private void onSyntheticData(ByteBuffer bytes)
private void onDataFragment(ByteBuffer bytes)
{
DataFrame frame = new DataFrame(streamId, (byte)(flags & ~DataInfo.FLAG_CLOSE), bytes.remaining());
onDataFrame(frame, bytes);

View File

@ -43,10 +43,11 @@ public class AsyncTimeoutTest
final long timeout = 1000;
final TimeUnit unit = TimeUnit.MILLISECONDS;
ByteBufferPool bufferPool = new StandardByteBufferPool();
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), null, 1, null, generator)
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator)
{
@Override
public void flush()
@ -87,10 +88,11 @@ public class AsyncTimeoutTest
final long timeout = 1000;
final TimeUnit unit = TimeUnit.MILLISECONDS;
ByteBufferPool bufferPool = new StandardByteBufferPool();
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), null, 1, null, generator)
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator)
{
private final AtomicInteger flushes = new AtomicInteger();

View File

@ -29,7 +29,7 @@ public class ClientUsageTest
@Test
public void testClientRequestResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@ -48,7 +48,7 @@ public class ClientUsageTest
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@ -69,7 +69,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
final String context = "context";
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
@ -104,7 +104,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, null, null, 1, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.StringDataInfo;
@ -45,7 +46,7 @@ public class DataGenerateParseTest
int length = content.length();
DataInfo data = new StringDataInfo(content, true);
int streamId = 13;
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.data(streamId, 2 * length, data);
Assert.assertNotNull(buffer);
@ -70,7 +71,7 @@ public class DataGenerateParseTest
int length = content.length();
DataInfo data = new StringDataInfo(content, true);
int streamId = 13;
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.data(streamId, 2 * length, data);
Assert.assertNotNull(buffer);
@ -100,7 +101,7 @@ public class DataGenerateParseTest
int length = content.length();
DataInfo data = new StringDataInfo(content, true);
int streamId = 13;
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.data(streamId, 2 * length, data);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;
@ -33,7 +34,7 @@ public class GoAwayGenerateParseTest
int lastStreamId = 13;
int statusCode = 1;
GoAwayFrame frame1 = new GoAwayFrame(SPDY.V3, lastStreamId, statusCode);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -59,7 +60,7 @@ public class GoAwayGenerateParseTest
int lastStreamId = 13;
int statusCode = 1;
GoAwayFrame frame1 = new GoAwayFrame(SPDY.V3, lastStreamId, statusCode);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
@ -37,7 +38,7 @@ public class HeadersGenerateParseTest
Headers headers = new Headers();
headers.put("a", "b");
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -65,7 +66,7 @@ public class HeadersGenerateParseTest
Headers headers = new Headers();
headers.put("a", "b");
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
@ -30,7 +31,7 @@ public class NoOpGenerateParseTest
public void testGenerateParse() throws Exception
{
NoOpFrame frame1 = new NoOpFrame();
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -51,7 +52,7 @@ public class NoOpGenerateParseTest
public void testGenerateParseOneByteAtATime() throws Exception
{
NoOpFrame frame1 = new NoOpFrame();
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;
@ -32,7 +33,7 @@ public class PingGenerateParseTest
{
int pingId = 13;
PingFrame frame1 = new PingFrame(SPDY.V2, pingId);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -56,7 +57,7 @@ public class PingGenerateParseTest
{
int pingId = 13;
PingFrame frame1 = new PingFrame(SPDY.V2, pingId);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.StreamStatus;
@ -34,7 +35,7 @@ public class RstStreamGenerateParseTest
int streamId = 13;
int streamStatus = StreamStatus.UNSUPPORTED_VERSION.getCode(SPDY.V2);
RstStreamFrame frame1 = new RstStreamFrame(SPDY.V2, streamId, streamStatus);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -60,7 +61,7 @@ public class RstStreamGenerateParseTest
int streamId = 13;
int streamStatus = StreamStatus.UNSUPPORTED_VERSION.getCode(SPDY.V2);
RstStreamFrame frame1 = new RstStreamFrame(SPDY.V2, streamId, streamStatus);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Settings;
@ -37,7 +38,7 @@ public class SettingsGenerateParseTest
settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, Settings.Flag.PERSIST, 100));
settings.put(new Settings.Setting(Settings.ID.ROUND_TRIP_TIME, Settings.Flag.PERSISTED, 500));
SettingsFrame frame1 = new SettingsFrame(SPDY.V2, flags, settings);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -64,7 +65,7 @@ public class SettingsGenerateParseTest
settings.put(new Settings.Setting(Settings.ID.DOWNLOAD_RETRANSMISSION_RATE, 100));
settings.put(new Settings.Setting(Settings.ID.ROUND_TRIP_TIME, 500));
SettingsFrame frame1 = new SettingsFrame(SPDY.V2, flags, settings);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@ -37,7 +38,7 @@ public class SynReplyGenerateParseTest
Headers headers = new Headers();
headers.put("a", "b");
SynReplyFrame frame1 = new SynReplyFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -65,7 +66,7 @@ public class SynReplyGenerateParseTest
Headers headers = new Headers();
headers.put("a", "b");
SynReplyFrame frame1 = new SynReplyFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;
@ -40,7 +41,7 @@ public class SynStreamGenerateParseTest
headers.put("a", "b");
headers.put("c", "d");
SynStreamFrame frame1 = new SynStreamFrame(SPDY.V2, flags, streamId, associatedStreamId, priority, headers);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -73,7 +74,7 @@ public class SynStreamGenerateParseTest
headers.put("a", "b");
headers.put("c", "d");
SynStreamFrame frame1 = new SynStreamFrame(SPDY.V2, flags, streamId, associatedStreamId, priority, headers);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;
@ -33,7 +34,7 @@ public class WindowUpdateGenerateParseTest
int streamId = 13;
int windowDelta = 17;
WindowUpdateFrame frame1 = new WindowUpdateFrame(SPDY.V2, streamId, windowDelta);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);
@ -59,7 +60,7 @@ public class WindowUpdateGenerateParseTest
int streamId = 13;
int windowDelta = 17;
WindowUpdateFrame frame1 = new WindowUpdateFrame(SPDY.V2, streamId, windowDelta);
Generator generator = new Generator(new StandardCompressionFactory().newCompressor());
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
Assert.assertNotNull(buffer);

View File

@ -42,7 +42,7 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
{
super.doStart();
// Override the "spdy/2" protocol by handling HTTP over SPDY
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), getScheduler(), this));
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this));
// Add the "http/1.1" protocol for browsers that do not support NPN
putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
}

View File

@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
{
private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnection.class);
private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
private final Queue<Runnable> tasks = new LinkedList<>();
private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
@ -291,7 +292,15 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
public void content(final DataInfo dataInfo, boolean endRequest)
{
dataInfos.offer(dataInfo);
dataInfos.offer(new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
{
@Override
public void consume(int delta)
{
super.consume(delta);
dataInfo.consume(delta);
}
});
complete = endRequest;
post(new Runnable()
{
@ -644,11 +653,10 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
}
else if (!closed)
{
ByteBuffer buffer = ByteBuffer.allocate(0);
closed = true;
_state = STATE_END;
// Send the data frame
stream.data(new ByteBufferDataInfo(buffer, true));
stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.spdy.EmptyAsyncEndPoint;
import org.eclipse.jetty.spdy.SPDYAsyncConnection;
import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
@ -42,9 +43,9 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
private final Connector connector;
public ServerHTTPSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, Connector connector)
public ServerHTTPSPDYAsyncConnectionFactory(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, Connector connector)
{
super(version, threadPool, scheduler);
super(version, bufferPool, threadPool, scheduler);
this.connector = connector;
}

View File

@ -72,7 +72,7 @@ public abstract class AbstractHTTPSPDYTest
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), getScheduler(), this);
return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this);
}
};
}

View File

@ -518,7 +518,8 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
contentBytes.addAndGet(dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available())));
contentBytes.addAndGet(dataInfo.asByteBuffer(true).remaining());
if (dataInfo.isClose())
{
Assert.assertEquals(data.length, contentBytes.get());
@ -575,7 +576,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
contentBytes.addAndGet(dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available())));
contentBytes.addAndGet(dataInfo.asByteBuffer(true).remaining());
if (dataInfo.isClose())
{
Assert.assertEquals(2 * data.length, contentBytes.get());

View File

@ -56,7 +56,6 @@ public class EmptyAsyncConnection extends AbstractConnection implements AsyncCon
@Override
public void onClose()
{
// TODO
}
@Override

View File

@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller<StandardSession.FrameBytes>, IdleListener
{
private static final Logger logger = LoggerFactory.getLogger(SPDYAsyncConnection.class);
private final ByteBufferPool bufferPool;
private final Parser parser;
private volatile Session session;
private ByteBuffer writeBuffer;
@ -43,9 +44,10 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
private StandardSession.FrameBytes writeContext;
private volatile boolean writePending;
public SPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser)
public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser)
{
super(endPoint);
this.bufferPool = bufferPool;
this.parser = parser;
onIdle(true);
}
@ -78,18 +80,21 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
public int fill() throws IOException
{
// TODO: use buffer pool
NIOBuffer jettyBuffer = new DirectNIOBuffer(4096);
ByteBuffer buffer = bufferPool.acquire(8192, true);
NIOBuffer jettyBuffer = new DirectNIOBuffer(buffer, false);
jettyBuffer.setPutIndex(jettyBuffer.getIndex());
AsyncEndPoint endPoint = getEndPoint();
int filled = endPoint.fill(jettyBuffer);
logger.debug("Filled {} from {}", filled, endPoint);
if (filled <= 0)
return filled;
ByteBuffer buffer = jettyBuffer.getByteBuffer();
buffer.limit(jettyBuffer.putIndex());
buffer.position(jettyBuffer.getIndex());
parser.parse(buffer);
bufferPool.release(buffer);
return filled;
}

View File

@ -176,6 +176,7 @@ public class SPDYClient
{
private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new StandardByteBufferPool();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Executor threadPool;
private final SslContextFactory sslContextFactory;
@ -412,17 +413,16 @@ public class SPDYClient
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
SessionPromise sessionPromise = (SessionPromise)attachment;
Factory factory = sessionPromise.client.factory;
CompressionFactory compressionFactory = new StandardCompressionFactory();
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(compressionFactory.newCompressor());
Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
Factory factory = sessionPromise.client.factory;
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory);
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator);
parser.addListener(session);
sessionPromise.completed(session);
connection.setSession(session);
@ -436,9 +436,9 @@ public class SPDYClient
{
private final Factory factory;
public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, Factory factory)
public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
{
super(endPoint, parser);
super(endPoint, bufferPool, parser);
this.factory = factory;
}

View File

@ -48,6 +48,7 @@ public class SPDYServerConnector extends SelectChannelConnector
// Order is important on server side, so we use a LinkedHashMap
private final Map<String, AsyncConnectionFactory> factories = new LinkedHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new StandardByteBufferPool();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ServerSessionFrameListener listener;
private final SslContextFactory sslContextFactory;
@ -66,6 +67,11 @@ public class SPDYServerConnector extends SelectChannelConnector
addBean(sslContextFactory);
}
public ByteBufferPool getByteBufferPool()
{
return bufferPool;
}
protected Executor getExecutor()
{
final ThreadPool threadPool = getThreadPool();
@ -90,7 +96,7 @@ public class SPDYServerConnector extends SelectChannelConnector
protected void doStart() throws Exception
{
super.doStart();
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), scheduler, listener);
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), scheduler, listener);
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
}

View File

@ -30,19 +30,21 @@ import org.eclipse.jetty.spdy.parser.Parser;
public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
{
private final ByteBufferPool bufferPool;
private final Executor threadPool;
private final ScheduledExecutorService scheduler;
private final short version;
private final ServerSessionFrameListener listener;
public ServerSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler)
public ServerSPDYAsyncConnectionFactory(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler)
{
this(version, threadPool, scheduler, null);
this(version, bufferPool, threadPool, scheduler, null);
}
public ServerSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, ServerSessionFrameListener listener)
public ServerSPDYAsyncConnectionFactory(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, ServerSessionFrameListener listener)
{
this.version = version;
this.bufferPool = bufferPool;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.listener = listener;
@ -53,7 +55,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
{
CompressionFactory compressionFactory = new StandardCompressionFactory();
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(compressionFactory.newCompressor());
Generator generator = new Generator(bufferPool, compressionFactory.newCompressor());
ServerSessionFrameListener listener = this.listener;
if (listener == null)
@ -61,10 +63,10 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
SPDYServerConnector connector = (SPDYServerConnector)attachment;
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector);
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, threadPool, scheduler, connection, connection, 2, listener, generator);
final StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);
@ -84,9 +86,9 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
private final SPDYServerConnector connector;
private volatile boolean connected;
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
{
super(endPoint, parser);
super(endPoint, bufferPool, parser);
this.listener = listener;
this.connector = connector;
}

View File

@ -1,103 +0,0 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class ConcurrentTest extends AbstractTest
{
// TODO: fix the test
@Ignore
@Test
public void testConcurrentSyn() throws Exception
{
final CountDownLatch slowServerLatch = new CountDownLatch(1);
final CountDownLatch fastServerLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
try
{
Headers headers = synInfo.getHeaders();
String url = headers.get("url").value();
switch (url)
{
case "/slow":
Assert.assertTrue(fastServerLatch.await(10, TimeUnit.SECONDS));
slowServerLatch.countDown();
break;
case "/fast":
fastServerLatch.countDown();
break;
default:
Assert.fail();
}
stream.reply(new ReplyInfo(true));
return null;
}
catch (InterruptedException x)
{
throw new SPDYException(x);
}
}
}), null);
final CountDownLatch slowClientLatch = new CountDownLatch(1);
Headers headers1 = new Headers();
headers1.put("url", "/slow");
session.syn(new SynInfo(headers1, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
slowClientLatch.countDown();
}
});
final CountDownLatch fastClientLatch = new CountDownLatch(1);
Headers headers2 = new Headers();
headers2.put("url", "/fast");
session.syn(new SynInfo(headers2, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
fastClientLatch.countDown();
}
});
Assert.assertTrue(fastServerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(fastClientLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(slowServerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(slowClientLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -90,19 +90,19 @@ public class FlowControlTest extends AbstractTest
else if (dataFrames == 2)
{
// Read but not consume, we should be flow-control stalled
dataInfo.readInto(ByteBuffer.allocate(dataInfo.length()));
dataInfo.asByteBuffer(false);
exchanger.exchange(dataInfo);
}
else if (dataFrames == 3)
{
// Consume partially, we should be flow-control stalled
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.length() / 2));
dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2));
exchanger.exchange(dataInfo);
}
else if (dataFrames == 4 || dataFrames == 5)
{
// Consume totally
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.length()));
dataInfo.asByteBuffer(true);
exchanger.exchange(dataInfo);
}
else
@ -129,7 +129,7 @@ public class FlowControlTest extends AbstractTest
});
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available()));
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
@ -156,7 +156,7 @@ public class FlowControlTest extends AbstractTest
}
});
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available()));
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
@ -204,19 +204,19 @@ public class FlowControlTest extends AbstractTest
else if (dataFrames == 2)
{
// Read but not consume, we should be flow-control stalled
dataInfo.readInto(ByteBuffer.allocate(dataInfo.length()));
dataInfo.asByteBuffer(false);
exchanger.exchange(dataInfo);
}
else if (dataFrames == 3)
{
// Consume partially, we should be flow-control stalled
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.length() / 2));
dataInfo.consumeInto(ByteBuffer.allocate(dataInfo.length() / 2));
exchanger.exchange(dataInfo);
}
else if (dataFrames == 4 || dataFrames == 5)
{
// Consume totally
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.length()));
dataInfo.asByteBuffer(true);
exchanger.exchange(dataInfo);
}
else
@ -258,7 +258,7 @@ public class FlowControlTest extends AbstractTest
});
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available()));
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
// Check that we are flow control stalled
@ -285,7 +285,7 @@ public class FlowControlTest extends AbstractTest
}
});
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.drainInto(ByteBuffer.allocate(dataInfo.available()));
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());

View File

@ -307,9 +307,7 @@ public class SynReplyTest extends AbstractTest
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
ByteBuffer buffer = ByteBuffer.allocate(dataInfo.available());
dataInfo.readInto(buffer);
buffer.flip();
ByteBuffer buffer = dataInfo.asByteBuffer(false);
String data = Charset.forName("UTF-8").decode(buffer).toString();
Assert.assertEquals(serverData, data);
serverDataLatch.countDown();