Issue #3479 - allow async sending of partial data frames and ping/pong

Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
lachan-roberts 2019-04-11 11:40:47 +10:00 committed by Greg Wilkins
parent 04f0872913
commit 0bd1f3918d
2 changed files with 227 additions and 83 deletions

View File

@ -37,19 +37,8 @@ public interface RemoteEndpoint
void sendBytes(ByteBuffer data) throws IOException;
/**
* Initiates the asynchronous transmission of a binary message. This method returns before the message is
* transmitted. Developers may use the returned
* Future object to track progress of the transmission.
*
* @param data the data being sent
* @return the Future object representing the send operation.
*/
Future<Void> sendBytesByFuture(ByteBuffer data);
/**
* Initiates the asynchronous transmission of a binary message. This method returns before the message is
* transmitted. Developers may provide a callback to
* be notified when the message has been transmitted or resulted in an error.
* Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted.
* Developers may provide a callback to be notified when the message has been transmitted or resulted in an error.
*
* @param data the data being sent
* @param callback callback to notify of success or failure of the write operation
@ -57,8 +46,17 @@ public interface RemoteEndpoint
void sendBytes(ByteBuffer data, WriteCallback callback);
/**
* Send a binary message in pieces, blocking until all of the message has been transmitted. The runtime reads the
* message in order. Non-final pieces are
* Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted.
* Developers may use the returned Future object to track progress of the transmission.
*
* @param data the data being sent
* @return the Future object representing the send operation.
*/
Future<Void> sendBytesByFuture(ByteBuffer data);
/**
* Send a binary message in pieces, blocking until all of the message has been transmitted.
* The runtime reads the message in order. Non-final pieces are
* sent with isLast set to false. The final piece must be sent with isLast set to true.
*
* @param fragment the piece of the message being sent
@ -68,35 +66,30 @@ public interface RemoteEndpoint
void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException;
/**
* Send a text message in pieces, blocking until all of the message has been transmitted. The runtime reads the
* message in order. Non-final pieces are sent
* with isLast set to false. The final piece must be sent with isLast set to true.
* Initiates the asynchronous transmission of a partial binary message. This method returns before the message is
* transmitted.
* The runtime reads the message in order. Non-final pieces are sent with isLast
* set to false. The final piece must be sent with isLast set to true.
* Developers may provide a callback to be notified when the message has been transmitted or resulted in an error.
*
* @param fragment the piece of the message being sent
* @param fragment the data being sent
* @param isLast true if this is the last piece of the partial bytes
* @throws IOException if unable to send the partial bytes
* @param callback callback to notify of success or failure of the write operation
*/
void sendPartialString(String fragment, boolean isLast) throws IOException;
void sendPartialBytes(ByteBuffer fragment, boolean isLast, WriteCallback callback);
/**
* Send a Ping message containing the given application data to the remote endpoint. The corresponding Pong message
* may be picked up using the
* MessageHandler.Pong handler.
* Initiates the asynchronous transmission of a partial binary message. This method returns before the message is
* transmitted.
* The runtime reads the message in order. Non-final pieces are sent with isLast
* set to false. The final piece must be sent with isLast set to true.
* Developers may use the returned Future object to track progress of the transmission.
*
* @param applicationData the data to be carried in the ping request
* @throws IOException if unable to send the ping
* @param fragment the data being sent
* @param isLast true if this is the last piece of the partial bytes
* @return the Future object representing the send operation.
*/
void sendPing(ByteBuffer applicationData) throws IOException;
/**
* Allows the developer to send an unsolicited Pong message containing the given application data in order to serve
* as a unidirectional heartbeat for the
* session.
*
* @param applicationData the application data to be carried in the pong response.
* @throws IOException if unable to send the pong
*/
void sendPong(ByteBuffer applicationData) throws IOException;
Future<Void> sendPartialBytesByFuture(ByteBuffer fragment, boolean isLast);
/**
* Send a text message, blocking until all bytes of the message has been transmitted.
@ -108,6 +101,16 @@ public interface RemoteEndpoint
*/
void sendString(String text) throws IOException;
/**
* Initiates the asynchronous transmission of a text message. This method may return before the message is
* transmitted. Developers may provide a callback to
* be notified when the message has been transmitted or resulted in an error.
*
* @param text the text being sent
* @param callback callback to notify of success or failure of the write operation
*/
void sendString(String text, WriteCallback callback);
/**
* Initiates the asynchronous transmission of a text message. This method may return before the message is
* transmitted. Developers may use the returned
@ -119,14 +122,97 @@ public interface RemoteEndpoint
Future<Void> sendStringByFuture(String text);
/**
* Initiates the asynchronous transmission of a text message. This method may return before the message is
* transmitted. Developers may provide a callback to
* be notified when the message has been transmitted or resulted in an error.
* Send a text message in pieces, blocking until all of the message has been transmitted. The runtime reads the
* message in order. Non-final pieces are sent
* with isLast set to false. The final piece must be sent with isLast set to true.
*
* @param text the text being sent
* @param fragment the piece of the message being sent
* @param isLast true if this is the last piece of the partial bytes
* @throws IOException if unable to send the partial bytes
*/
void sendPartialString(String fragment, boolean isLast) throws IOException;
/**
* Initiates the asynchronous transmission of a partial text message.
* This method may return before the message is transmitted.
* The runtime reads the message in order. Non-final pieces are sent with isLast
* set to false. The final piece must be sent with isLast set to true.
* Developers may provide a callback to be notified when the message has been transmitted or resulted in an error.
*
* @param fragment the text being sent
* @param isLast true if this is the last piece of the partial bytes
* @param callback callback to notify of success or failure of the write operation
*/
void sendString(String text, WriteCallback callback);
void sendPartialString(String fragment, boolean isLast, WriteCallback callback) throws IOException;
/**
* Initiates the asynchronous transmission of a partial text message.
* This method may return before the message is transmitted.
* The runtime reads the message in order. Non-final pieces are sent with isLast
* set to false. The final piece must be sent with isLast set to true.
* Developers may use the returned Future object to track progress of the transmission.
*
* @param fragment the text being sent
* @param isLast true if this is the last piece of the partial bytes
* @return the Future object representing the send operation.
*/
Future<Void> sendPartialStringByFuture(String fragment, boolean isLast) throws IOException;
/**
* Send a Ping message containing the given application data to the remote endpoint, blocking until all of the
* message has been transmitted.
* The corresponding Pong message may be picked up using the MessageHandler.Pong handler.
*
* @param applicationData the data to be carried in the ping request
* @throws IOException if unable to send the ping
*/
void sendPing(ByteBuffer applicationData) throws IOException;
/**
* Asynchronously send a Ping message containing the given application data to the remote endpoint.
* The corresponding Pong message may be picked up using the MessageHandler.Pong handler.
*
* @param applicationData the data to be carried in the ping request
* @param callback callback to notify of success or failure of the write operation
*/
void sendPing(ByteBuffer applicationData, WriteCallback callback);
/**
* Asynchronously send a Ping message containing the given application data to the remote endpoint.
* The corresponding Pong message may be picked up using the MessageHandler.Pong handler.
*
* @param applicationData the data to be carried in the ping request
* @return the Future object representing the send operation.
*/
Future<Void> sendPingByFuture(ByteBuffer applicationData);
/**
* Allows the developer to send an unsolicited Pong message containing the given application data
* in order to serve as a unidirectional heartbeat for the session, this will block until
* all of the message has been transmitted.
*
* @param applicationData the application data to be carried in the pong response.
* @throws IOException if unable to send the pong
*/
void sendPong(ByteBuffer applicationData) throws IOException;
/**
* Allows the developer to asynchronously send an unsolicited Pong message containing the given application data
* in order to serve as a unidirectional heartbeat for the session.
*
* @param applicationData the application data to be carried in the pong response.
* @param callback callback to notify of success or failure of the write operation
*/
void sendPong(ByteBuffer applicationData, WriteCallback callback);
/**
* Allows the developer to asynchronously send an unsolicited Pong message containing the given application data
* in order to serve as a unidirectional heartbeat for the session.
*
* @param applicationData the application data to be carried in the pong response.
* @return the Future object representing the send operation.
*/
Future<Void> sendPongByFuture(ByteBuffer applicationData);
/**
* @return the batch mode with which messages are sent.

View File

@ -87,26 +87,30 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
}
}
protected FrameHandler.CoreSession getCoreSession()
{
return coreSession;
}
private void sendBlocking(Frame frame) throws IOException
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
b.block();
}
}
@Override
public void sendBytes(ByteBuffer data) throws IOException
{
sendBlocking(new Frame(OpCode.BINARY).setPayload(data));
}
@Override
public void sendBytes(ByteBuffer data, WriteCallback callback)
{
coreSession.sendFrame(new Frame(OpCode.BINARY).setPayload(data),
Callback.from(callback::writeSuccess, callback::writeFailed),
isBatch());
}
@Override
public Future<Void> sendBytesByFuture(ByteBuffer data)
{
FutureCallback callback = new FutureCallback();
coreSession.sendFrame(new Frame(OpCode.BINARY).setPayload(data),
callback,
isBatch());
return callback;
}
@Override
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
{
@ -117,6 +121,43 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
}
}
@Override
public void sendPartialBytes(ByteBuffer fragment, boolean isLast, WriteCallback callback)
{
sendPartialBytes(fragment, isLast, Callback.from(callback::writeSuccess, callback::writeFailed));
}
@Override
public Future<Void> sendPartialBytesByFuture(ByteBuffer fragment, boolean isLast)
{
FutureCallback callback = new FutureCallback();
sendPartialBytes(fragment, isLast, callback);
return callback;
}
@Override
public void sendString(String text) throws IOException
{
sendBlocking(new Frame(OpCode.TEXT).setPayload(text));
}
@Override
public void sendString(String text, WriteCallback callback)
{
Callback cb = callback == null?Callback.NOOP:Callback.from(callback::writeSuccess, callback::writeFailed);
coreSession.sendFrame(new Frame(OpCode.TEXT).setPayload(text), cb, isBatch());
}
@Override
public Future<Void> sendStringByFuture(String text)
{
FutureCallback callback = new FutureCallback();
coreSession.sendFrame(new Frame(OpCode.TEXT).setPayload(text),
callback,
isBatch());
return callback;
}
@Override
public void sendPartialString(String fragment, boolean isLast) throws IOException
{
@ -127,39 +168,59 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
}
}
@Override
public void sendPartialString(String fragment, boolean isLast, WriteCallback callback) throws IOException
{
sendPartialText(fragment, isLast, Callback.from(callback::writeSuccess, callback::writeFailed));
}
@Override
public Future<Void> sendPartialStringByFuture(String fragment, boolean isLast) throws IOException
{
FutureCallback callback = new FutureCallback();
sendPartialText(fragment, isLast, callback);
return callback;
}
@Override
public void sendPing(ByteBuffer applicationData) throws IOException
{
coreSession.sendFrame(new Frame(OpCode.PING).setPayload(applicationData), Callback.NOOP, false);
sendBlocking(new Frame(OpCode.PING).setPayload(applicationData));
}
@Override
public void sendPing(ByteBuffer applicationData, WriteCallback callback)
{
coreSession.sendFrame(new Frame(OpCode.PING).setPayload(applicationData),
Callback.from(callback::writeSuccess, callback::writeFailed), false);
}
@Override
public Future<Void> sendPingByFuture(ByteBuffer applicationData)
{
FutureCallback callback = new FutureCallback();
coreSession.sendFrame(new Frame(OpCode.PING).setPayload(applicationData), callback, false);
return callback;
}
@Override
public void sendPong(ByteBuffer applicationData) throws IOException
{
coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(applicationData), Callback.NOOP, false);
sendBlocking(new Frame(OpCode.PONG).setPayload(applicationData));
}
@Override
public void sendString(String text) throws IOException
public void sendPong(ByteBuffer applicationData, WriteCallback callback)
{
sendBlocking(new Frame(OpCode.TEXT).setPayload(text));
coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(applicationData),
Callback.from(callback::writeSuccess, callback::writeFailed), false);
}
@Override
public void sendBytes(ByteBuffer data, WriteCallback callback)
{
coreSession.sendFrame(new Frame(OpCode.BINARY).setPayload(data),
Callback.from(callback::writeSuccess, callback::writeFailed),
isBatch());
}
@Override
public Future<Void> sendBytesByFuture(ByteBuffer data)
public Future<Void> sendPongByFuture(ByteBuffer applicationData)
{
FutureCallback callback = new FutureCallback();
coreSession.sendFrame(new Frame(OpCode.BINARY).setPayload(data),
callback,
isBatch());
coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(applicationData), callback, false);
return callback;
}
@ -219,21 +280,18 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
}
}
@Override
public void sendString(String text, WriteCallback callback)
private void sendBlocking(Frame frame) throws IOException
{
Callback cb = callback == null?Callback.NOOP:Callback.from(callback::writeSuccess, callback::writeFailed);
coreSession.sendFrame(new Frame(OpCode.TEXT).setPayload(text), cb, isBatch());
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
b.block();
}
}
@Override
public Future<Void> sendStringByFuture(String text)
protected FrameHandler.CoreSession getCoreSession()
{
FutureCallback callback = new FutureCallback();
coreSession.sendFrame(new Frame(OpCode.TEXT).setPayload(text),
callback,
isBatch());
return callback;
return coreSession;
}
@Override