WebSocket - supporting WriteCallback in jetty RemoteEndpoint

+ new RemoteEndpoint.sendBytes(ByteBuffer, WriteCallback)
+ new RemoteEndpoint.sendString(String, WriteCallback)
This commit is contained in:
Joakim Erdfelt 2013-07-30 15:25:41 -07:00
parent 28b3ee8b13
commit 9ebf890d51
7 changed files with 95 additions and 18 deletions

View File

@ -35,18 +35,26 @@ public interface RemoteEndpoint
void sendBytes(ByteBuffer data) throws IOException;
/**
* Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted. Developers may provide a callback to
* be notified when the message has been transmitted, or may use the returned Future object to track progress of the transmission. Errors in transmission
* are given to the developer in the WriteResult object in either case.
* Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted. Developers may use the returned
* Future object to track progress of the transmission.
*
* @param data
* the data being sent
* @param completion
* handler that will be notified of progress
* @return the Future object representing the send operation.
*/
Future<Void> sendBytesByFuture(ByteBuffer data);
/**
* Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted. Developers may provide a callback to
* be notified when the message has been transmitted or resulted in an error.
*
* @param data
* the data being sent
* @param callback
* callback to notify of success or failure of the write operation
*/
void sendBytes(ByteBuffer data, WriteCallback callback);
/**
* Send a binary message in pieces, blocking until all of the message has been transmitted. The runtime reads the message in order. Non-final pieces are
* sent with isLast set to false. The final piece must be sent with isLast set to true.
@ -94,15 +102,23 @@ public interface RemoteEndpoint
void sendString(String text) throws IOException;
/**
* Initiates the asynchronous transmission of a text message. This method returns before the message is transmitted. Developers may provide a callback to be
* notified when the message has been transmitted, or may use the returned Future object to track progress of the transmission. Errors in transmission are
* given to the developer in the WriteResult object in either case.
* Initiates the asynchronous transmission of a text message. This method may return before the message is transmitted. Developers may use the returned
* Future object to track progress of the transmission.
*
* @param text
* the text being sent
* @param completion
* the handler which will be notified of progress
* @return the Future object representing the send operation.
*/
Future<Void> sendStringByFuture(String text);
/**
* Initiates the asynchronous transmission of a text message. This method may return before the message is transmitted. Developers may provide a callback to
* be notified when the message has been transmitted or resulted in an error.
*
* @param text
* the text being sent
* @param callback
* callback to notify of success or failure of the write operation
*/
void sendString(String text, WriteCallback callback);
}

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -121,6 +122,44 @@ public class WebSocketClientTest
cliSock.assertMessage("Hello World!");
}
@Test
public void testBasicEcho_UsingCallback() throws Exception
{
JettyTrackingSocket cliSock = new JettyTrackingSocket();
client.getPolicy().setIdleTimeout(10000);
URI wsUri = server.getWsUri();
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols("echo");
Future<Session> future = client.connect(cliSock,wsUri,request);
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
Session sess = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.open",sess.isOpen(),is(true));
Assert.assertThat("Session.upgradeRequest",sess.getUpgradeRequest(),notNullValue());
Assert.assertThat("Session.upgradeResponse",sess.getUpgradeResponse(),notNullValue());
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
FutureWriteCallback callback = new FutureWriteCallback();
cliSock.getSession().getRemote().sendString("Hello World!",callback);
callback.get(1,TimeUnit.SECONDS);
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);
cliSock.assertMessage("Hello World!");
}
@Test
public void testBasicEcho_FromServer() throws Exception

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@ -141,6 +142,19 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
return sendAsyncFrame(frame);
}
@Override
public void sendBytes(ByteBuffer data, WriteCallback callback)
{
Objects.requireNonNull(callback,"WriteCallback cannot be null");
msgType.set(BINARY);
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
sendFrame(frame,callback);
}
public void sendFrame(WebSocketFrame frame, WriteCallback callback)
{
@ -317,7 +331,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public Future<Void> sendStringByFuture(String text)
{
msgType.set(BINARY);
msgType.set(TEXT);
WebSocketFrame frame = WebSocketFrame.text(text);
if (LOG.isDebugEnabled())
{
@ -325,4 +339,17 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
return sendAsyncFrame(frame);
}
@Override
public void sendString(String text, WriteCallback callback)
{
Objects.requireNonNull(callback,"WriteCallback cannot be null");
msgType.set(TEXT);
WebSocketFrame frame = WebSocketFrame.text(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
}
sendFrame(frame,callback);
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;

View File

@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.helper.CaptureSocket;

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.server.helper;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.server.helper;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -34,9 +36,6 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.junit.Assert;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
public class SafariD00
{
private URI uri;