More cleanup of websocket-server

This commit is contained in:
Joakim Erdfelt 2012-07-05 17:07:56 -07:00
parent 23c6c81615
commit cd46dc3cee
18 changed files with 150 additions and 160 deletions

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.websocket.api; package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.websocket.api.io.WebSocketBlockingConnection;
/** /**
* Default implementation of the {@link WebSocketListener}. * Default implementation of the {@link WebSocketListener}.
* <p> * <p>
@ -8,6 +10,12 @@ package org.eclipse.jetty.websocket.api;
public class WebSocketAdapter implements WebSocketListener public class WebSocketAdapter implements WebSocketListener
{ {
private WebSocketConnection connection; private WebSocketConnection connection;
private WebSocketBlockingConnection blocking;
public WebSocketBlockingConnection getBlockingConnection()
{
return blocking;
}
public WebSocketConnection getConnection() public WebSocketConnection getConnection()
{ {
@ -40,6 +48,7 @@ public class WebSocketAdapter implements WebSocketListener
public void onWebSocketConnect(WebSocketConnection connection) public void onWebSocketConnect(WebSocketConnection connection)
{ {
this.connection = connection; this.connection = connection;
this.blocking = new WebSocketBlockingConnection(this.connection);
} }
@Override @Override

View File

@ -2,7 +2,6 @@ package org.eclipse.jetty.websocket.api;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -67,22 +66,17 @@ public interface WebSocketConnection
boolean isOpen(); boolean isOpen();
/** /**
* Send a series of binary messages. * Send a a binary message.
* <p> * <p>
* Note: each buffer results in its own binary message frame. * NIO style with callbacks, allows for concurrent results of the write operation.
* <p>
* Advanced usage, with callbacks, allows for concurrent NIO style results of the entire write operation. (Callback is only called once at the end of
* processing all of the buffers)
*/ */
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IOException; <C> void write(C context, Callback<C> callback, byte buf[], int offset, int len) throws IOException;
/** /**
* Send a series of text messages. * Send a series of text messages.
* <p> * <p>
* Note: each messages results in its own text message frame. * NIO style with callbacks, allows for concurrent results of the entire write operation. (Callback is only called once at the end of processing all of the
* <p> * messages)
* Advanced usage, with callbacks, allows for concurrent NIO style results of the entire write operation. (Callback is only called once at the end of
* processing all of the messages)
*/ */
<C> void write(C context, Callback<C> callback, String... messages) throws IOException; <C> void write(C context, Callback<C> callback, String... messages) throws IOException;
} }

View File

@ -0,0 +1,19 @@
package org.eclipse.jetty.websocket.api.io;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
public class WebSocketPing
{
private WebSocketConnection conn;
public WebSocketPing(WebSocketConnection conn)
{
this.conn = conn;
}
public void sendPing(byte buf[], int offset, int len)
{
// TODO: implement
// TODO: should this block and wait for a pong? (how?)
}
}

View File

@ -241,9 +241,8 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IOException public <C> void write(C context, Callback<C> callback, byte buf[], int offset, int len) throws IOException
{ {
int len = buffers.length;
if (len == 0) if (len == 0)
{ {
// nothing to write // nothing to write
@ -251,18 +250,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("write(context,{},ByteBuffers->{})",callback,buffers.length); LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
} }
ByteBuffer raw[] = new ByteBuffer[len]; ByteBuffer raw = bufferPool.acquire(len + FrameGenerator.OVERHEAD,false);
for (int i = 0; i < len; i++) BufferUtil.clearToFill(raw);
{ BinaryFrame frame = new BinaryFrame(buf,offset,len);
raw[i] = bufferPool.acquire(buffers[i].remaining() + FrameGenerator.OVERHEAD,false); frame.setFin(true);
BufferUtil.clearToFill(raw[i]); generator.generate(raw,frame);
BinaryFrame frame = new BinaryFrame(buffers[i]); BufferUtil.flipToFlush(raw,0);
generator.generate(raw[i],frame); writeRaw(context,callback,raw);
BufferUtil.flipToFlush(raw[i],0);
}
getEndPoint().write(context,callback,raw);
} }
/** /**
@ -281,12 +277,17 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
{ {
LOG.debug("write(context,{},Strings->{})",callback,messages.length); LOG.debug("write(context,{},Strings->{})",callback,messages.length);
} }
TextFrame frames[] = new TextFrame[len]; ByteBuffer raw[] = new ByteBuffer[messages.length];
for (int i = 0; i < len; i++) for (int i = 0; i < len; i++)
{ {
frames[i] = new TextFrame(messages[i]); TextFrame frame = new TextFrame(messages[i]);
frame.setFin(true);
raw[i] = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clear(raw[i]);
generator.generate(raw[i],frame);
BufferUtil.flipToFlush(raw[i],0);
} }
// TODO write(context,callback,frames); writeRaw(context,callback,raw);
} }
@Override @Override

View File

@ -2,7 +2,6 @@ package org.eclipse.jetty.websocket.io;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
@ -69,7 +68,7 @@ public class LocalWebSocketConnection implements WebSocketConnection
} }
@Override @Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IOException public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
{ {
} }

View File

@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -70,15 +71,22 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
} }
private final String supportedVersions; private final String supportedVersions;
private WebSocketPolicy basePolicy; private final WebSocketPolicy basePolicy;
private final EventMethodsCache methodsCache;
private final ByteBufferPool bufferPool;
private WebSocketCreator creator; private WebSocketCreator creator;
private EventMethodsCache methodsCache;
private Class<?> firstRegisteredClass; private Class<?> firstRegisteredClass;
public WebSocketServerFactory(WebSocketPolicy policy) public WebSocketServerFactory(WebSocketPolicy policy)
{
this(policy,new StandardByteBufferPool());
}
public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
{ {
this.basePolicy = policy; this.basePolicy = policy;
this.methodsCache = new EventMethodsCache(); this.methodsCache = new EventMethodsCache();
this.bufferPool = bufferPool;
this.creator = this; this.creator = this;
// Create supportedVersions // Create supportedVersions
@ -121,7 +129,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
// Send the upgrade // Send the upgrade
WebSocketPolicy objPolicy = this.basePolicy.clonePolicy(); WebSocketPolicy objPolicy = this.basePolicy.clonePolicy();
WebSocketEventDriver websocket = new WebSocketEventDriver(methodsCache,objPolicy,websocketPojo); WebSocketEventDriver websocket = new WebSocketEventDriver(websocketPojo,methodsCache,objPolicy,bufferPool);
return upgrade(sockreq,sockresp,websocket); return upgrade(sockreq,sockresp,websocket);
} }

View File

@ -34,21 +34,17 @@ import java.util.zip.Inflater;
import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.annotations.OnWebSocketBinary;
import org.eclipse.jetty.websocket.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.annotations.OnWebSocketText;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.CloseFrame;
import org.eclipse.jetty.websocket.frames.PingFrame;
import org.eclipse.jetty.websocket.frames.PongFrame;
import org.eclipse.jetty.websocket.protocol.AcceptHash; import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -85,33 +81,14 @@ public class WebSocketMessageRFC6455Test
disconnected.countDown(); disconnected.countDown();
} }
@OnWebSocketFrame @OnWebSocketMessage
public void onFrame(BaseFrame frame)
{
if (_echo)
{
if (!(frame instanceof PingFrame) && !(frame instanceof PongFrame) && !(frame instanceof CloseFrame))
{
try
{
connection.write(frame);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
@OnWebSocketBinary
public void onMessage(byte[] data, int offset, int length) public void onMessage(byte[] data, int offset, int length)
{ {
if (_aggregate) if (_aggregate)
{ {
try try
{ {
connection.write(data,offset,length); connection.write(null,fnf(),data,offset,length);
} }
catch (IOException e) catch (IOException e)
{ {
@ -120,7 +97,7 @@ public class WebSocketMessageRFC6455Test
} }
} }
@OnWebSocketText @OnWebSocketMessage
public void onMessage(String data) public void onMessage(String data)
{ {
__textCount.incrementAndGet(); __textCount.incrementAndGet();
@ -140,7 +117,7 @@ public class WebSocketMessageRFC6455Test
{ {
try try
{ {
connection.write(data); connection.write(null,fnf(),data);
} }
catch (IOException e) catch (IOException e)
{ {
@ -157,7 +134,7 @@ public class WebSocketMessageRFC6455Test
{ {
try try
{ {
connection.write("sent on connect"); connection.write(null,fnf(),"sent on connect");
} }
catch (IOException e) catch (IOException e)
{ {
@ -170,13 +147,19 @@ public class WebSocketMessageRFC6455Test
} }
private static final int WSVERSION = 13; // RFC-6455 version private static final int WSVERSION = 13; // RFC-6455 version
private static Server __server; private static Server __server;
private static SelectChannelConnector __connector; private static SelectChannelConnector __connector;
private static TestWebSocket __serverWebSocket; private static TestWebSocket __serverWebSocket;
private static CountDownLatch __latch; private static CountDownLatch __latch;
private static AtomicInteger __textCount = new AtomicInteger(0); private static AtomicInteger __textCount = new AtomicInteger(0);
// Fire and Forget callback
public static Callback<Void> fnf()
{
return new FutureCallback<Void>();
}
@BeforeClass @BeforeClass
public static void startServer() throws Exception public static void startServer() throws Exception
{ {
@ -489,7 +472,7 @@ public class WebSocketMessageRFC6455Test
String mesg = "How Now Brown Cow"; String mesg = "How Now Brown Cow";
for (int i = 0; i < count; i++) for (int i = 0; i < count; i++)
{ {
__serverWebSocket.connection.write(mesg); __serverWebSocket.connection.write(null,fnf(),mesg);
if ((i % 100) == 0) if ((i % 100) == 0)
{ {
output.flush(); output.flush();
@ -911,7 +894,7 @@ public class WebSocketMessageRFC6455Test
assertTrue(__serverWebSocket.awaitDisconnected(5000)); assertTrue(__serverWebSocket.awaitDisconnected(5000));
try try
{ {
__serverWebSocket.connection.write("Don't send"); __serverWebSocket.connection.write(null,fnf(),"Don't send");
Assert.fail("Should have thrown IOException"); Assert.fail("Should have thrown IOException");
} }
catch (IOException e) catch (IOException e)
@ -1292,7 +1275,7 @@ public class WebSocketMessageRFC6455Test
message.append(text); message.append(text);
} }
String data = message.toString(); String data = message.toString();
__serverWebSocket.connection.write(data); __serverWebSocket.connection.write(null,fnf(),data);
assertEquals(OpCode.TEXT.getCode(),input.read()); assertEquals(OpCode.TEXT.getCode(),input.read());
assertEquals(0x7e,input.read()); assertEquals(0x7e,input.read());
@ -1364,7 +1347,7 @@ public class WebSocketMessageRFC6455Test
try try
{ {
__serverWebSocket.connection.write("Don't send"); __serverWebSocket.connection.write(null,fnf(),"Don't send");
Assert.fail("Should have thrown IOException"); Assert.fail("Should have thrown IOException");
} }
catch (IOException e) catch (IOException e)

View File

@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@ -35,9 +37,16 @@ import org.junit.Test;
public class WebSocketOverSSLTest public class WebSocketOverSSLTest
{ {
// Fire and Forget callback
public static Callback<Void> fnf()
{
return new FutureCallback<Void>();
}
private Server _server; private Server _server;
private int _port; private int _port;
private QueuedThreadPool _threadPool; private QueuedThreadPool _threadPool;
// private WebSocketClientFactory _wsFactory; // private WebSocketClientFactory _wsFactory;
private WebSocketConnection _connection; private WebSocketConnection _connection;
@ -118,7 +127,7 @@ public class WebSocketOverSSLTest
String message = new String(chars); String message = new String(chars);
for (int i = 0; i < count; ++i) for (int i = 0; i < count; ++i)
{ {
_connection.write(message); _connection.write(null,fnf(),message);
} }
Assert.assertTrue(clientLatch.await(20,TimeUnit.SECONDS)); Assert.assertTrue(clientLatch.await(20,TimeUnit.SECONDS));
@ -150,7 +159,7 @@ public class WebSocketOverSSLTest
try try
{ {
Assert.assertEquals(message,message); Assert.assertEquals(message,message);
connection.write(message); connection.write(null,fnf(),message);
serverLatch.countDown(); serverLatch.countDown();
} }
catch (IOException x) catch (IOException x)
@ -169,7 +178,7 @@ public class WebSocketOverSSLTest
clientLatch.countDown(); clientLatch.countDown();
} }
}); });
_connection.write(message); _connection.write(null,fnf(),message);
Assert.assertTrue(serverLatch.await(5,TimeUnit.SECONDS)); Assert.assertTrue(serverLatch.await(5,TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5,TimeUnit.SECONDS)); Assert.assertTrue(clientLatch.await(5,TimeUnit.SECONDS));

View File

@ -6,6 +6,7 @@ import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
@ -54,7 +55,7 @@ public class WebSocketServletRFCTest
// echo the message back. // echo the message back.
try try
{ {
getConnection().write(message); getConnection().write(null,new FutureCallback<Void>(),message);
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -11,6 +11,7 @@ import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@ -64,7 +65,7 @@ public class TestABCase7_9
// echo the message back. // echo the message back.
try try
{ {
getConnection().write(message); getConnection().write(null,new FutureCallback<Void>(),message);
} }
catch (IOException e) catch (IOException e)
{ {
@ -184,7 +185,7 @@ public class TestABCase7_9
ByteBuffer frame = FrameBuilder.closeFrame().withMask(new byte[] ByteBuffer frame = FrameBuilder.closeFrame().withMask(new byte[]
{ 0x44, 0x44, 0x44, 0x44 }).asByteBuffer(); { 0x44, 0x44, 0x44, 0x44 }).asByteBuffer();
ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2); ByteBuffer buf = ByteBuffer.allocate(FrameGenerator.OVERHEAD + 2);
BufferUtil.clearToFill(buf); BufferUtil.clearToFill(buf);
@ -210,5 +211,5 @@ public class TestABCase7_9
client.close(); client.close();
} }
} }
} }

View File

@ -15,7 +15,7 @@ public class BasicEchoSocket extends WebSocketAdapter
} }
try try
{ {
getConnection().write(payload,offset,len); getBlockingConnection().write(payload,offset,len);
} }
catch (IOException e) catch (IOException e)
{ {
@ -32,7 +32,7 @@ public class BasicEchoSocket extends WebSocketAdapter
} }
try try
{ {
getConnection().write(message); getBlockingConnection().write(message);
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -17,7 +17,7 @@ public class MyEchoSocket extends WebSocketAdapter
try try
{ {
// echo the data back // echo the data back
getConnection().write(message); getBlockingConnection().write(message);
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -1,10 +1,9 @@
package org.eclipse.jetty.websocket.server.examples.echo; package org.eclipse.jetty.websocket.server.examples.echo;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.annotations.OnWebSocketBinary; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.annotations.OnWebSocketText; import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
@ -14,8 +13,8 @@ import org.eclipse.jetty.websocket.api.WebSocketConnection;
@WebSocket(maxTextSize = 64 * 1024, maxBinarySize = 64 * 1024) @WebSocket(maxTextSize = 64 * 1024, maxBinarySize = 64 * 1024)
public class BigEchoSocket public class BigEchoSocket
{ {
@OnWebSocketBinary @OnWebSocketMessage
public void onBinary(WebSocketConnection conn, ByteBuffer buffer) public void onBinary(WebSocketConnection conn, byte buf[], int offset, int length)
{ {
if (conn.isOpen()) if (conn.isOpen())
{ {
@ -23,8 +22,7 @@ public class BigEchoSocket
} }
try try
{ {
buffer.flip(); // flip the incoming buffer to write mode conn.write(null,new FutureCallback<Void>(),buf,offset,length);
conn.write(buffer);
} }
catch (IOException e) catch (IOException e)
{ {
@ -32,7 +30,7 @@ public class BigEchoSocket
} }
} }
@OnWebSocketText @OnWebSocketMessage
public void onText(WebSocketConnection conn, String message) public void onText(WebSocketConnection conn, String message)
{ {
if (conn.isOpen()) if (conn.isOpen())
@ -41,7 +39,7 @@ public class BigEchoSocket
} }
try try
{ {
conn.write(message); conn.write(null,new FutureCallback<Void>(),message);
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -1,23 +1,23 @@
package org.eclipse.jetty.websocket.server.examples.echo; package org.eclipse.jetty.websocket.server.examples.echo;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.frames.PingFrame; import org.eclipse.jetty.websocket.api.io.WebSocketPing;
@WebSocket @WebSocket
public class EchoBroadcastPingSocket extends EchoBroadcastSocket public class EchoBroadcastPingSocket extends EchoBroadcastSocket
{ {
private class KeepAlive extends Thread private static class KeepAlive extends Thread
{ {
private CountDownLatch latch; private CountDownLatch latch;
private WebSocketPing pinger;
private WebSocketConnection getConnection() public KeepAlive(WebSocketConnection conn)
{ {
return EchoBroadcastPingSocket.this.conn; this.pinger = new WebSocketPing(conn);
} }
@Override @Override
@ -27,14 +27,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket
{ {
while (!latch.await(10,TimeUnit.SECONDS)) while (!latch.await(10,TimeUnit.SECONDS))
{ {
System.err.println("Ping " + getConnection()); System.err.println("Ping " + pinger);
PingFrame ping = new PingFrame(); byte data[] = new byte[]
ByteBuffer payload = ByteBuffer.allocate(3); { (byte)1, (byte)2, (byte)3 };
payload.put((byte)1); pinger.sendPing(data,0,3);
payload.put((byte)2);
payload.put((byte)3);
ping.setPayload(payload);
getConnection().write(ping);
} }
} }
catch (Exception e) catch (Exception e)
@ -59,11 +55,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket
} }
} }
private final KeepAlive keepAlive; // A dedicated thread is not a good way to do this private KeepAlive keepAlive; // A dedicated thread is not a good way to do this
public EchoBroadcastPingSocket() public EchoBroadcastPingSocket()
{ {
keepAlive = new KeepAlive();
} }
@Override @Override
@ -76,6 +71,10 @@ public class EchoBroadcastPingSocket extends EchoBroadcastSocket
@Override @Override
public void onOpen(WebSocketConnection conn) public void onOpen(WebSocketConnection conn)
{ {
if (keepAlive == null)
{
keepAlive = new KeepAlive(conn);
}
keepAlive.start(); keepAlive.start();
super.onOpen(conn); super.onOpen(conn);
} }

View File

@ -3,10 +3,10 @@ package org.eclipse.jetty.websocket.server.examples.echo;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.websocket.annotations.OnWebSocketBinary; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.annotations.OnWebSocketText; import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
@ -17,14 +17,14 @@ public class EchoBroadcastSocket
protected WebSocketConnection conn; protected WebSocketConnection conn;
@OnWebSocketBinary @OnWebSocketMessage
public void onBinary(byte buf[], int offset, int len) public void onBinary(byte buf[], int offset, int len)
{ {
for (EchoBroadcastSocket sock : BROADCAST) for (EchoBroadcastSocket sock : BROADCAST)
{ {
try try
{ {
sock.conn.write(buf,offset,len); sock.conn.write(null,new FutureCallback<Void>(),buf,offset,len);
} }
catch (IOException e) catch (IOException e)
{ {
@ -47,14 +47,14 @@ public class EchoBroadcastSocket
BROADCAST.add(this); BROADCAST.add(this);
} }
@OnWebSocketText @OnWebSocketMessage
public void onText(String text) public void onText(String text)
{ {
for (EchoBroadcastSocket sock : BROADCAST) for (EchoBroadcastSocket sock : BROADCAST)
{ {
try try
{ {
sock.conn.write(text); sock.conn.write(null,new FutureCallback<Void>(),text);
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -1,15 +1,14 @@
package org.eclipse.jetty.websocket.server.examples.echo; package org.eclipse.jetty.websocket.server.examples.echo;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame; import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.frames.DataFrame; import org.eclipse.jetty.websocket.protocol.Frame;
/** /**
* Echo back the incoming text or binary as 2 frames of (roughly) equal size. * Echo back the incoming text or binary as 2 frames of (roughly) equal size.
@ -18,30 +17,30 @@ import org.eclipse.jetty.websocket.frames.DataFrame;
public class EchoFragmentSocket public class EchoFragmentSocket
{ {
@OnWebSocketFrame @OnWebSocketFrame
public void onFrame(WebSocketConnection conn, DataFrame data) public void onFrame(WebSocketConnection conn, Frame frame)
{ {
ByteBuffer payload = data.getPayload(); if (!frame.getOpCode().isDataFrame())
BufferUtil.flipToFlush(payload,0); {
int half = payload.remaining() / 2; return;
}
ByteBuffer buf1 = payload.slice(); byte data[] = frame.getPayloadData();
ByteBuffer buf2 = payload.slice(); int half = data.length / 2;
buf1.limit(half);
buf2.position(half);
DataFrame d1 = new DataFrame(data.getOpCode());
d1.setFin(false);
d1.setPayload(buf1);
DataFrame d2 = new DataFrame(data.getOpCode());
d2.setFin(true);
d2.setPayload(buf2);
Callback<Void> nop = new FutureCallback<>(); Callback<Void> nop = new FutureCallback<>();
try try
{ {
conn.write(null,nop,d1,d2); switch (frame.getOpCode())
{
case BINARY:
conn.write(null,nop,data,0,half);
conn.write(null,nop,data,half,data.length - half);
break;
case TEXT:
conn.write(null,nop,new String(data,0,half,StringUtil.__UTF8_CHARSET));
conn.write(null,nop,new String(data,half,data.length - half,StringUtil.__UTF8_CHARSET));
break;
}
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -1,30 +0,0 @@
package org.eclipse.jetty.websocket.server.examples.echo;
import java.io.IOException;
import org.eclipse.jetty.websocket.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.frames.BaseFrame;
@WebSocket
public class EchoFrameSocket
{
@OnWebSocketFrame
public void onFrame(WebSocketConnection conn, BaseFrame frame)
{
if (!conn.isOpen())
{
return;
}
try
{
conn.write(frame);
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
}
}

View File

@ -72,6 +72,6 @@ public class MessageSender extends WebSocketAdapter
public void sendMessage(String format, Object... args) throws IOException public void sendMessage(String format, Object... args) throws IOException
{ {
getConnection().write(String.format(format,args)); getBlockingConnection().write(String.format(format,args));
} }
} }