JSR-356 attempting to rethink the InputStream handling

This commit is contained in:
Joakim Erdfelt 2013-06-04 15:10:44 -07:00
parent a4d4d81580
commit 3f3b9fda61
2 changed files with 340 additions and 0 deletions

View File

@ -0,0 +1,153 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.websocket.common.LogicalConnection;
/**
* An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
*/
public class PayloadInputStream extends InputStream implements MessageAppender
{
/**
* Used for controlling read suspend/resume behavior if the queue is full, but the read operations haven't caught up yet.
*/
@SuppressWarnings("unused")
private final LogicalConnection connection;
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private AtomicBoolean closed = new AtomicBoolean(false);
// EOB / End of Buffers
private AtomicBoolean buffersExhausted = new AtomicBoolean(false);
private ByteBuffer activeBuffer = null;
public PayloadInputStream(LogicalConnection connection)
{
this.connection = connection;
}
@Override
public void appendMessage(ByteBuffer payload, boolean isLast) throws IOException
{
if (buffersExhausted.get())
{
// This indicates a programming mistake/error and must be bug fixed
throw new RuntimeException("Last frame already received");
}
// if closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
{
return;
}
// Put the payload into the queue
try
{
buffers.put(payload);
if (isLast)
{
buffersExhausted.set(true);
}
}
catch (InterruptedException e)
{
throw new IOException(e);
}
}
@Override
public void close() throws IOException
{
closed.set(true);
super.close();
}
@Override
public synchronized void mark(int readlimit)
{
/* do nothing */
}
@Override
public boolean markSupported()
{
return false;
}
@Override
public void messageComplete()
{
buffersExhausted.set(true);
// toss an empty ByteBuffer into queue to let it drain
try
{
buffers.put(ByteBuffer.wrap(new byte[0]));
}
catch (InterruptedException ignore)
{
/* ignore */
}
}
@Override
public int read() throws IOException
{
try
{
if (closed.get())
{
return -1;
}
if (activeBuffer == null)
{
activeBuffer = buffers.take();
}
while (activeBuffer.remaining() <= 0)
{
if (buffersExhausted.get())
{
closed.set(true);
return -1;
}
activeBuffer = buffers.take();
}
return activeBuffer.get();
}
catch (InterruptedException e)
{
throw new IOException(e);
}
}
@Override
public synchronized void reset() throws IOException
{
throw new IOException("reset() not supported");
}
}

View File

@ -0,0 +1,187 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class PayloadInputStreamTest
{
private static final Charset UTF8 = StringUtil.__UTF8_CHARSET;
@Rule
public TestName testname = new TestName();
@Test
public void testBasicAppendRead() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
// Append a message (simple, short)
ByteBuffer payload = BufferUtil.toBuffer("Hello World",UTF8);
System.out.printf("payload = %s%n",BufferUtil.toDetailString(payload));
boolean fin = true;
stream.appendMessage(payload,fin);
// Read it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
// Test it
Assert.assertThat("Message",message,is("Hello World"));
}
}
@Test
public void testBlockOnRead() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
boolean fin = false;
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer("Saved",UTF8),fin);
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer(" by ",UTF8),fin);
fin = true;
TimeUnit.MILLISECONDS.sleep(200);
stream.appendMessage(BufferUtil.toBuffer("Zero",UTF8),fin);
}
catch (IOException | InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,UTF8);
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Message",message,is("Saved by Zero"));
}
}
@Test
public void testBlockOnReadInitial() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
boolean fin = true;
// wait for a little bit before populating buffers
TimeUnit.MILLISECONDS.sleep(400);
stream.appendMessage(BufferUtil.toBuffer("I will conquer",UTF8),fin);
}
catch (IOException | InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read byte from stream.
int b = stream.read();
// Should be a byte, blocking till byte received.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Initial byte",b,is((int)'I'));
}
}
@Test
public void testReadByteNoBuffersClosed() throws IOException
{
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname);
try (PayloadInputStream stream = new PayloadInputStream(conn))
{
final AtomicBoolean hadError = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
// wait for a little bit before sending input closed
TimeUnit.MILLISECONDS.sleep(400);
stream.messageComplete();
}
catch (InterruptedException e)
{
hadError.set(true);
e.printStackTrace(System.err);
}
}
}).start();
// Read byte from stream.
int b = stream.read();
// Should be a -1, indicating the end of the stream.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Initial byte",b,is(-1));
}
}
}