415744 - Reduce Future usage in websocket

This commit is contained in:
Greg Wilkins 2013-08-23 19:58:46 +10:00
parent c27020740f
commit e4235ea070
20 changed files with 71 additions and 49 deletions

View File

@ -33,7 +33,7 @@ public class JettyEchoSocket extends WebSocketAdapter
@Override @Override
public void onWebSocketBinary(byte[] payload, int offset, int len) public void onWebSocketBinary(byte[] payload, int offset, int len)
{ {
getRemote().sendBytesByFuture(BufferUtil.toBuffer(payload,offset,len)); getRemote().sendBytes(BufferUtil.toBuffer(payload,offset,len),null);
} }
@Override @Override
@ -45,6 +45,6 @@ public class JettyEchoSocket extends WebSocketAdapter
@Override @Override
public void onWebSocketText(String message) public void onWebSocketText(String message)
{ {
getRemote().sendStringByFuture(message); getRemote().sendString(message,null);
} }
} }

View File

@ -70,7 +70,7 @@ public class JettyEchoSocket
public void onMessage(String msg) public void onMessage(String msg)
{ {
incomingMessages.add(msg); incomingMessages.add(msg);
remote.sendStringByFuture(msg); remote.sendString(msg,null);
} }
@OnWebSocketConnect @OnWebSocketConnect

View File

@ -116,7 +116,7 @@ public class WebSocketClientTest
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1)); Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
cliSock.getSession().getRemote().sendStringByFuture("Hello World!"); cliSock.getSession().getRemote().sendString("Hello World!",null);
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server // wait for response from server
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS); cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);

View File

@ -0,0 +1,19 @@
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
public class BlockingWriteCallback extends BlockingCallback implements WriteCallback
{
@Override
public void writeFailed(Throwable x)
{
failed(x);
}
@Override
public void writeSuccess()
{
succeeded();
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -52,6 +53,18 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private static final int TEXT = 1; private static final int TEXT = 1;
private static final int BINARY = 2; private static final int BINARY = 2;
private static final int CONTROL = 3; private static final int CONTROL = 3;
private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
{
@Override
public void writeSuccess()
{
}
@Override
public void writeFailed(Throwable x)
{
}
};
private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class); private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
public final LogicalConnection connection; public final LogicalConnection connection;
@ -72,19 +85,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private void blockingWrite(WebSocketFrame frame) throws IOException private void blockingWrite(WebSocketFrame frame) throws IOException
{ {
Future<Void> fut = sendAsyncFrame(frame); // TODO Blocking callbacks can be recycled, but they do not handle concurrent calls,
try // so if some mutual exclusion can be applied, then this callback can be reused.
{ BlockingWriteCallback callback = new BlockingWriteCallback();
fut.get(); // block till done sendFrame(frame,callback);
} callback.block();
catch (ExecutionException e)
{
throw new IOException("Failed to write bytes",e.getCause());
}
catch (InterruptedException e)
{
throw new IOException("Failed to write bytes",e);
}
} }
public InetSocketAddress getInetSocketAddress() public InetSocketAddress getInetSocketAddress()
@ -150,13 +155,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override @Override
public void sendBytes(ByteBuffer data, WriteCallback callback) public void sendBytes(ByteBuffer data, WriteCallback callback)
{ {
Objects.requireNonNull(callback,"WriteCallback cannot be null");
msgType.set(BINARY); msgType.set(BINARY);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback); LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
} }
sendFrame(new BinaryFrame().setPayload(data),callback); sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
} }
public void sendFrame(WebSocketFrame frame, WriteCallback callback) public void sendFrame(WebSocketFrame frame, WriteCallback callback)
@ -356,13 +360,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override @Override
public void sendString(String text, WriteCallback callback) public void sendString(String text, WriteCallback callback)
{ {
Objects.requireNonNull(callback,"WriteCallback cannot be null");
msgType.set(TEXT); msgType.set(TEXT);
TextFrame frame = new TextFrame().setPayload(text); TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback); LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
} }
sendFrame(frame,callback); sendFrame(frame,callback==null?NOOP_CALLBACK:callback);
} }
} }

View File

@ -35,7 +35,7 @@ public class AnnotatedEchoSocket
{ {
System.out.printf("Echoing back message [%s]%n",message); System.out.printf("Echoing back message [%s]%n",message);
// echo the message back // echo the message back
session.getRemote().sendStringByFuture(message); session.getRemote().sendString(message,null);
} }
} }
} }

View File

@ -59,7 +59,7 @@ public class ListenerEchoSocket implements WebSocketListener
{ {
System.out.printf("Echoing back message [%s]%n",message); System.out.printf("Echoing back message [%s]%n",message);
// echo the message back // echo the message back
outbound.getRemote().sendStringByFuture(message); outbound.getRemote().sendString(message,null);
} }
} }
} }

View File

@ -36,6 +36,6 @@ public class MyStatelessEchoSocket
@OnWebSocketMessage @OnWebSocketMessage
public void onText(Session session, String text) public void onText(Session session, String text)
{ {
session.getRemote().sendStringByFuture(text); session.getRemote().sendString(text,null);
} }
} }

View File

@ -69,7 +69,7 @@ public class LoadTest
@OnWebSocketMessage @OnWebSocketMessage
public void onWebSocketText(String message) public void onWebSocketText(String message)
{ {
session.getRemote().sendStringByFuture(message); session.getRemote().sendString(message,null);
long iter = count.incrementAndGet(); long iter = count.incrementAndGet();
if ((iter % 100) == 0) if ((iter % 100) == 0)
{ {

View File

@ -46,7 +46,7 @@ public class ABSocket
// echo the message back. // echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len); ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytesByFuture(data); this.session.getRemote().sendBytes(data,null);
} }
@OnWebSocketConnect @OnWebSocketConnect
@ -73,7 +73,7 @@ public class ABSocket
try try
{ {
// echo the message back. // echo the message back.
this.session.getRemote().sendStringByFuture(message); this.session.getRemote().sendString(message,null);
} }
catch (WebSocketException e) catch (WebSocketException e)
{ {

View File

@ -67,7 +67,7 @@ public class BrowserSocket
randomText[i] = letters[rand.nextInt(lettersLen)]; randomText[i] = letters[rand.nextInt(lettersLen)];
} }
msg = String.format("ManyThreads [%s]",String.valueOf(randomText)); msg = String.format("ManyThreads [%s]",String.valueOf(randomText));
remote.sendStringByFuture(msg); remote.sendString(msg,null);
} }
} }
} }
@ -219,7 +219,7 @@ public class BrowserSocket
} }
// Async write // Async write
remote.sendStringByFuture(message); remote.sendString(message,null);
} }
private void writeMessage(String format, Object... args) private void writeMessage(String format, Object... args)

View File

@ -42,7 +42,7 @@ public class BigEchoSocket
LOG.warn("Session is closed"); LOG.warn("Session is closed");
return; return;
} }
session.getRemote().sendBytesByFuture(ByteBuffer.wrap(buf,offset,length)); session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,length),null);
} }
@OnWebSocketMessage @OnWebSocketMessage
@ -53,6 +53,6 @@ public class BigEchoSocket
LOG.warn("Session is closed"); LOG.warn("Session is closed");
return; return;
} }
session.getRemote().sendStringByFuture(message); session.getRemote().sendString(message,null);
} }
} }

View File

@ -40,7 +40,7 @@ public class EchoBroadcastSocket
ByteBuffer data = ByteBuffer.wrap(buf,offset,len); ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
for (EchoBroadcastSocket sock : BROADCAST) for (EchoBroadcastSocket sock : BROADCAST)
{ {
sock.session.getRemote().sendBytesByFuture(data.slice()); sock.session.getRemote().sendBytes(data.slice(),null);
} }
} }
@ -62,7 +62,7 @@ public class EchoBroadcastSocket
{ {
for (EchoBroadcastSocket sock : BROADCAST) for (EchoBroadcastSocket sock : BROADCAST)
{ {
sock.session.getRemote().sendStringByFuture(text); sock.session.getRemote().sendString(text,null);
} }
} }
} }

View File

@ -60,13 +60,13 @@ public class EchoFragmentSocket
switch (frame.getType()) switch (frame.getType())
{ {
case BINARY: case BINARY:
remote.sendBytesByFuture(buf1); remote.sendBytes(buf1,null);
remote.sendBytesByFuture(buf2); remote.sendBytes(buf2,null);
break; break;
case TEXT: case TEXT:
// NOTE: This impl is not smart enough to split on a UTF8 boundary // NOTE: This impl is not smart enough to split on a UTF8 boundary
remote.sendStringByFuture(BufferUtil.toUTF8String(buf1)); remote.sendString(BufferUtil.toUTF8String(buf1),null);
remote.sendStringByFuture(BufferUtil.toUTF8String(buf2)); remote.sendString(BufferUtil.toUTF8String(buf2),null);
break; break;
default: default:
throw new IOException("Unexpected frame type: " + frame.getType()); throw new IOException("Unexpected frame type: " + frame.getType());

View File

@ -44,7 +44,7 @@ public class EchoSocket
// echo the message back. // echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len); ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytesByFuture(data); this.session.getRemote().sendBytes(data,null);
} }
@OnWebSocketConnect @OnWebSocketConnect
@ -59,6 +59,6 @@ public class EchoSocket
LOG.debug("onText({})",message); LOG.debug("onText({})",message);
// echo the message back. // echo the message back.
this.session.getRemote().sendStringByFuture(message); this.session.getRemote().sendString(message,null);
} }
} }

View File

@ -41,7 +41,7 @@ public class RFCSocket
// echo the message back. // echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len); ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytesByFuture(data); this.session.getRemote().sendBytes(data,null);
} }
@OnWebSocketConnect @OnWebSocketConnect
@ -62,6 +62,6 @@ public class RFCSocket
} }
// echo the message back. // echo the message back.
this.session.getRemote().sendStringByFuture(message); this.session.getRemote().sendString(message,null);
} }
} }

View File

@ -61,7 +61,7 @@ public class SessionSocket
if (values == null) if (values == null)
{ {
session.getRemote().sendStringByFuture("<null>"); session.getRemote().sendString("<null>",null);
return; return;
} }
@ -78,21 +78,21 @@ public class SessionSocket
delim = true; delim = true;
} }
valueStr.append(']'); valueStr.append(']');
session.getRemote().sendStringByFuture(valueStr.toString()); session.getRemote().sendString(valueStr.toString(),null);
return; return;
} }
if ("session.isSecure".equals(message)) if ("session.isSecure".equals(message))
{ {
String issecure = String.format("session.isSecure=%b",session.isSecure()); String issecure = String.format("session.isSecure=%b",session.isSecure());
session.getRemote().sendStringByFuture(issecure); session.getRemote().sendString(issecure,null);
return; return;
} }
if ("session.upgradeRequest.requestURI".equals(message)) if ("session.upgradeRequest.requestURI".equals(message))
{ {
String response = String.format("session.upgradeRequest.requestURI=%s",session.getUpgradeRequest().getRequestURI().toASCIIString()); String response = String.format("session.upgradeRequest.requestURI=%s",session.getUpgradeRequest().getRequestURI().toASCIIString());
session.getRemote().sendStringByFuture(response); session.getRemote().sendString(response,null);
return; return;
} }
@ -103,7 +103,7 @@ public class SessionSocket
} }
// echo the message back. // echo the message back.
this.session.getRemote().sendStringByFuture(message); this.session.getRemote().sendString(message,null);
} }
catch (Throwable t) catch (Throwable t)
{ {

View File

@ -34,6 +34,6 @@ public class MyBinaryEchoSocket
public void onWebSocketText(Session session, byte buf[], int offset, int len) public void onWebSocketText(Session session, byte buf[], int offset, int len)
{ {
// Echo message back, asynchronously // Echo message back, asynchronously
session.getRemote().sendBytesByFuture(ByteBuffer.wrap(buf,offset,len)); session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,len),null);
} }
} }

View File

@ -32,6 +32,6 @@ public class MyEchoSocket
public void onWebSocketText(Session session, String message) public void onWebSocketText(Session session, String message)
{ {
// Echo message back, asynchronously // Echo message back, asynchronously
session.getRemote().sendStringByFuture(message); session.getRemote().sendString(message,null);
} }
} }

View File

@ -123,7 +123,7 @@ public class WebSocketChatServlet extends WebSocketServlet implements WebSocketC
} }
// Async write the message back. // Async write the message back.
member.remote.sendStringByFuture(data); member.remote.sendString(data,null);
} }
} }