DataInfo javadocs + modified slightly the API for helper methods, passing a boolean telling to read or consume the bytes.

This commit is contained in:
Simone Bordet 2012-03-01 11:47:51 +01:00
parent f9345bdc84
commit f4e4effe50
5 changed files with 58 additions and 27 deletions

View File

@ -28,6 +28,16 @@ import java.util.concurrent.atomic.AtomicInteger;
* type, via {@link Stream#data(DataInfo)}. The last instance must have the * type, via {@link Stream#data(DataInfo)}. The last instance must have the
* {@link #isClose() close flag} set, so that the client knows that no more content is * {@link #isClose() close flag} set, so that the client knows that no more content is
* expected.</p> * expected.</p>
* <p>Receivers of {@link DataInfo} via {@link StreamFrameListener#onData(Stream, DataInfo)}
* have two different APIs to read the data content bytes: a {@link #readInto(ByteBuffer) read}
* API that does not interact with flow control, and a {@link #drainInto(ByteBuffer) drain}
* API that interacts with flow control.</p>
* <p>Flow control is defined so that when the sender wants to sends a number of bytes larger
* than the {@link Settings.ID#INITIAL_WINDOW_SIZE} value, it will stop sending as soon as it
* has sent a number of bytes equal to the window size. The receiver has to <em>consume</em>
* the data bytes that it received in order to tell the sender to send more bytes.</p>
* <p>Consuming the data bytes can be done only via {@link #drainInto(ByteBuffer)} or by a combination
* of {@link #readInto(ByteBuffer)} and {@link #consume(int)} (possibly at different times).</p>
*/ */
public abstract class DataInfo public abstract class DataInfo
{ {
@ -139,15 +149,23 @@ public abstract class DataInfo
/** /**
* <p>Copies the content bytes of this {@link DataInfo} into the given {@link ByteBuffer}.</p> * <p>Copies the content bytes of this {@link DataInfo} into the given {@link ByteBuffer}.</p>
* <p>If the given {@link ByteBuffer} cannot contain the whole content of this {@link DataInfo} * <p>If the given {@link ByteBuffer} cannot contain the whole content of this {@link DataInfo}
* then {@link #available()} will return a positive value, and further content * then after the read {@link #available()} will return a positive value, and further content
* may be retrieved by invoking again this method.</p> * may be retrieved by invoking again this method with a new output buffer.</p>
* *
* @param output the {@link ByteBuffer} to copy to bytes into * @param output the {@link ByteBuffer} to copy to bytes into
* @return the number of bytes copied * @return the number of bytes copied
* @see #available() * @see #available()
* @see #drainInto(ByteBuffer)
*/ */
public abstract int readInto(ByteBuffer output); public abstract int readInto(ByteBuffer output);
/**
* <p>Reads and consumes the content bytes of this {@link DataInfo} into the given {@link ByteBuffer}.</p>
*
* @param output the {@link ByteBuffer} to copy to bytes into
* @return the number of bytes copied
* @see #consume(int)
*/
public int drainInto(ByteBuffer output) public int drainInto(ByteBuffer output)
{ {
int read = readInto(output); int read = readInto(output);
@ -155,8 +173,15 @@ public abstract class DataInfo
return read; return read;
} }
/**
* <p>Consumes the given number of bytes from this {@link DataInfo}.</p>
*
* @param delta the number of bytes consumed
*/
public void consume(int delta) public void consume(int delta)
{ {
if (delta < 0)
throw new IllegalArgumentException();
int read = length() - available(); int read = length() - available();
int newConsumed = consumed() + delta; int newConsumed = consumed() + delta;
if (newConsumed > read) if (newConsumed > read)
@ -164,31 +189,33 @@ public abstract class DataInfo
consumed.addAndGet(delta); consumed.addAndGet(delta);
} }
/**
* @return the number of bytes consumed
*/
public int consumed() public int consumed()
{ {
return consumed.get(); return consumed.get();
} }
/** /**
*
* @param charset the charset used to convert the bytes * @param charset the charset used to convert the bytes
* @param consume whether to consume the content
* @return a String with the content of this {@link DataInfo} * @return a String with the content of this {@link DataInfo}
*/ */
public String asString(String charset) public String asString(String charset, boolean consume)
{ {
ByteBuffer buffer = ByteBuffer.allocate(available()); ByteBuffer buffer = asByteBuffer(consume);
readInto(buffer);
buffer.flip();
return Charset.forName(charset).decode(buffer).toString(); return Charset.forName(charset).decode(buffer).toString();
} }
/** /**
* @return a byte array with the content of this {@link DataInfo} * @return a byte array with the content of this {@link DataInfo}
* @param consume whether to consume the content
*/ */
public byte[] asBytes() public byte[] asBytes(boolean consume)
{ {
ByteBuffer buffer = ByteBuffer.allocate(available()); ByteBuffer buffer = asByteBuffer(consume);
readInto(buffer);
buffer.flip();
byte[] result = new byte[buffer.remaining()]; byte[] result = new byte[buffer.remaining()];
buffer.get(result); buffer.get(result);
return result; return result;
@ -196,11 +223,15 @@ public abstract class DataInfo
/** /**
* @return a {@link ByteBuffer} with the content of this {@link DataInfo} * @return a {@link ByteBuffer} with the content of this {@link DataInfo}
* @param consume whether to consume the content
*/ */
public ByteBuffer asByteBuffer() public ByteBuffer asByteBuffer(boolean consume)
{ {
ByteBuffer buffer = ByteBuffer.allocate(available()); ByteBuffer buffer = ByteBuffer.allocate(available());
readInto(buffer); if (consume)
drainInto(buffer);
else
readInto(buffer);
buffer.flip(); buffer.flip();
return buffer; return buffer;
} }

View File

@ -130,7 +130,7 @@ public class ClientUsageTest
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
StringBuilder builder = (StringBuilder)stream.getAttribute("builder"); StringBuilder builder = (StringBuilder)stream.getAttribute("builder");
builder.append(dataInfo.asString("UTF-8")); builder.append(dataInfo.asString("UTF-8", true));
if (dataInfo.isClose()) if (dataInfo.isClose())
{ {
int receivedLength = builder.toString().getBytes(Charset.forName("UTF-8")).length; int receivedLength = builder.toString().getBytes(Charset.forName("UTF-8")).length;

View File

@ -159,7 +159,7 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
logger.debug("Received {} on {}", dataInfo, stream); logger.debug("Received {} on {}", dataInfo, stream);
final ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute("connection"); final ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute("connection");
final ByteBuffer buffer = dataInfo.asByteBuffer(); final ByteBuffer buffer = dataInfo.asByteBuffer(true);
final boolean isClose = dataInfo.isClose(); final boolean isClose = dataInfo.isClose();
connection.post(new Runnable() connection.post(new Runnable()

View File

@ -349,7 +349,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
Assert.assertTrue(dataInfo.isClose()); Assert.assertTrue(dataInfo.isClose());
Assert.assertEquals(data, dataInfo.asString("UTF-8")); Assert.assertEquals(data, dataInfo.asString("UTF-8", true));
dataLatch.countDown(); dataLatch.countDown();
} }
}); });
@ -399,7 +399,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
Assert.assertTrue(dataInfo.isClose()); Assert.assertTrue(dataInfo.isClose());
byte[] bytes = dataInfo.asBytes(); byte[] bytes = dataInfo.asBytes(true);
Assert.assertEquals(1, bytes.length); Assert.assertEquals(1, bytes.length);
Assert.assertEquals(data, bytes[0]); Assert.assertEquals(data, bytes[0]);
dataLatch.countDown(); dataLatch.countDown();
@ -460,9 +460,9 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
int data = dataFrames.incrementAndGet(); int data = dataFrames.incrementAndGet();
Assert.assertTrue(data >= 1 && data <= 2); Assert.assertTrue(data >= 1 && data <= 2);
if (data == 1) if (data == 1)
Assert.assertEquals(data1, dataInfo.asString("UTF8")); Assert.assertEquals(data1, dataInfo.asString("UTF8", true));
else else
Assert.assertEquals(data2, dataInfo.asString("UTF8")); Assert.assertEquals(data2, dataInfo.asString("UTF8", true));
dataLatch.countDown(); dataLatch.countDown();
} }
}); });
@ -628,7 +628,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
ByteBuffer byteBuffer = dataInfo.asByteBuffer(); ByteBuffer byteBuffer = dataInfo.asByteBuffer(true);
while (byteBuffer.hasRemaining()) while (byteBuffer.hasRemaining())
buffer.write(byteBuffer.get()); buffer.write(byteBuffer.get());
if (dataInfo.isClose()) if (dataInfo.isClose())
@ -690,7 +690,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
ByteBuffer byteBuffer = dataInfo.asByteBuffer(); ByteBuffer byteBuffer = dataInfo.asByteBuffer(true);
while (byteBuffer.hasRemaining()) while (byteBuffer.hasRemaining())
buffer.write(byteBuffer.get()); buffer.write(byteBuffer.get());
if (dataInfo.isClose()) if (dataInfo.isClose())
@ -887,12 +887,12 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
if (count == 1) if (count == 1)
{ {
Assert.assertFalse(dataInfo.isClose()); Assert.assertFalse(dataInfo.isClose());
Assert.assertEquals(pangram1, dataInfo.asString("UTF-8")); Assert.assertEquals(pangram1, dataInfo.asString("UTF-8", true));
} }
else if (count == 2) else if (count == 2)
{ {
Assert.assertTrue(dataInfo.isClose()); Assert.assertTrue(dataInfo.isClose());
Assert.assertEquals(pangram2, dataInfo.asString("UTF-8")); Assert.assertEquals(pangram2, dataInfo.asString("UTF-8", true));
} }
dataLatch.countDown(); dataLatch.countDown();
} }
@ -949,7 +949,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{ {
Assert.assertEquals(1, dataFrames.incrementAndGet()); Assert.assertEquals(1, dataFrames.incrementAndGet());
Assert.assertTrue(dataInfo.isClose()); Assert.assertTrue(dataInfo.isClose());
Assert.assertArrayEquals(data, dataInfo.asBytes()); Assert.assertArrayEquals(data, dataInfo.asBytes(true));
dataLatch.countDown(); dataLatch.countDown();
} }
}); });

View File

@ -232,13 +232,13 @@ public class SynReplyTest extends AbstractTest
int dataCount = this.dataCount.incrementAndGet(); int dataCount = this.dataCount.incrementAndGet();
if (dataCount == 1) if (dataCount == 1)
{ {
String chunk1 = dataInfo.asString("UTF-8"); String chunk1 = dataInfo.asString("UTF-8", true);
Assert.assertEquals(data1, chunk1); Assert.assertEquals(data1, chunk1);
dataLatch1.countDown(); dataLatch1.countDown();
} }
else if (dataCount == 2) else if (dataCount == 2)
{ {
String chunk2 = dataInfo.asString("UTF-8"); String chunk2 = dataInfo.asString("UTF-8", true);
Assert.assertEquals(data2, chunk2); Assert.assertEquals(data2, chunk2);
dataLatch2.countDown(); dataLatch2.countDown();
} }
@ -274,7 +274,7 @@ public class SynReplyTest extends AbstractTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
String data = dataInfo.asString("UTF-8"); String data = dataInfo.asString("UTF-8", true);
Assert.assertEquals(clientData, data); Assert.assertEquals(clientData, data);
clientDataLatch.countDown(); clientDataLatch.countDown();
} }
@ -393,7 +393,7 @@ public class SynReplyTest extends AbstractTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
String chunk = dataInfo.asString("UTF-8"); String chunk = dataInfo.asString("UTF-8", true);
Assert.assertEquals(data, chunk); Assert.assertEquals(data, chunk);
dataLatch.countDown(); dataLatch.countDown();
} }