Introducing WriteBytesProvider
This commit is contained in:
parent
ecf7563a79
commit
ecb472f30b
|
@ -25,7 +25,9 @@ import java.util.concurrent.Future;
|
|||
public interface RemoteEndpoint
|
||||
{
|
||||
/**
|
||||
* Send a binary message, returning when all of the message has been transmitted.
|
||||
* Send a binary message, returning when all bytes of the message has been transmitted.
|
||||
* <p>
|
||||
* Note: this is a blocking call
|
||||
*
|
||||
* @param data
|
||||
* the message to be sent
|
||||
|
@ -43,7 +45,7 @@ public interface RemoteEndpoint
|
|||
* handler that will be notified of progress
|
||||
* @return the Future object representing the send operation.
|
||||
*/
|
||||
Future<WriteResult> sendBytesByFuture(ByteBuffer data);
|
||||
Future<Void> sendBytesByFuture(ByteBuffer data);
|
||||
|
||||
/**
|
||||
* Send a binary message in pieces, blocking until all of the message has been transmitted. The runtime reads the message in order. Non-final pieces are
|
||||
|
@ -70,7 +72,7 @@ public interface RemoteEndpoint
|
|||
* @param applicationData
|
||||
* the data to be carried in the ping request
|
||||
*/
|
||||
void sendPing(ByteBuffer applicationData);
|
||||
void sendPing(ByteBuffer applicationData) throws IOException;
|
||||
|
||||
/**
|
||||
* Allows the developer to send an unsolicited Pong message containing the given application data in order to serve as a unidirectional heartbeat for the
|
||||
|
@ -79,10 +81,12 @@ public interface RemoteEndpoint
|
|||
* @param applicationData
|
||||
* the application data to be carried in the pong response.
|
||||
*/
|
||||
void sendPong(ByteBuffer applicationData);
|
||||
void sendPong(ByteBuffer applicationData) throws IOException;
|
||||
|
||||
/**
|
||||
* Send a text message, blocking until all of the message has been transmitted.
|
||||
* Send a text message, blocking until all bytes of the message has been transmitted.
|
||||
* <p>
|
||||
* Note: this is a blocking call
|
||||
*
|
||||
* @param text
|
||||
* the message to be sent
|
||||
|
@ -100,5 +104,5 @@ public interface RemoteEndpoint
|
|||
* the handler which will be notified of progress
|
||||
* @return the Future object representing the send operation.
|
||||
*/
|
||||
Future<WriteResult> sendStringByFuture(String text);
|
||||
Future<Void> sendStringByFuture(String text);
|
||||
}
|
||||
|
|
|
@ -116,15 +116,15 @@ public interface WebSocketConnection
|
|||
/**
|
||||
* Send an async binary message.
|
||||
*/
|
||||
Future<WriteResult> write(byte buf[], int offset, int len) throws IOException;
|
||||
Future<Void> write(byte buf[], int offset, int len);
|
||||
|
||||
/**
|
||||
* Send an async binary message.
|
||||
*/
|
||||
Future<WriteResult> write(ByteBuffer buffer) throws IOException;
|
||||
Future<Void> write(ByteBuffer buffer);
|
||||
|
||||
/**
|
||||
* Send an async text messages.
|
||||
*/
|
||||
Future<WriteResult> write(String message) throws IOException;
|
||||
Future<Void> write(String message);
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.api;
|
||||
|
||||
/**
|
||||
* Callback for Write events.
|
||||
*/
|
||||
public interface WriteCallback
|
||||
{
|
||||
/*
|
||||
* NOTE: We don't expose org.eclipse.jetty.util.Callback here as that would complicate matters with the WebAppContext's classloader isolation.
|
||||
*/
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Callback invoked when the write fails.
|
||||
* </p>
|
||||
*
|
||||
* @param x
|
||||
* the reason for the write failure
|
||||
*/
|
||||
public void writeFailed(Throwable x);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Callback invoked when the write completes.
|
||||
* </p>
|
||||
*
|
||||
* @see #writeFailed(Throwable)
|
||||
*/
|
||||
public abstract void writeSuccess();
|
||||
}
|
|
@ -18,15 +18,24 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.api.extensions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
|
||||
/**
|
||||
* Interface for dealing with frames outgoing to the network (eventually)
|
||||
*/
|
||||
public interface OutgoingFrames
|
||||
{
|
||||
Future<WriteResult> outgoingFrame(Frame frame) throws IOException;
|
||||
/**
|
||||
* A frame, and optional callback, intended for the network.
|
||||
* <p>
|
||||
* Note: the frame can undergo many transformations in the various layers and extensions present in the implementation.
|
||||
* <p>
|
||||
* If you are implementing a mutation, you are obliged to handle the incoming WriteCallback appropriately.
|
||||
*
|
||||
* @param frame
|
||||
* the frame to eventually write to the network.
|
||||
* @param callback
|
||||
* the optional callback to use for success/failure of the network write operation. Can be null.
|
||||
*/
|
||||
void outgoingFrame(Frame frame, WriteCallback callback);
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.util.concurrent.Future;
|
|||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
|
||||
/**
|
||||
* For working with the {@link WebSocketConnection} in a blocking technique.
|
||||
|
@ -49,20 +48,16 @@ public class WebSocketBlockingConnection
|
|||
{
|
||||
try
|
||||
{
|
||||
Future<WriteResult> blocker = conn.write(data,offset,length);
|
||||
WriteResult result = blocker.get(); // block till finished
|
||||
if (result.getException() != null)
|
||||
{
|
||||
throw new WebSocketException(result.getException());
|
||||
}
|
||||
Future<Void> blocker = conn.write(data,offset,length);
|
||||
blocker.get(); // block till finished
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new IOException("Blocking write failed",e);
|
||||
throw new WebSocketException("Blocking write failed",e);
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
throw new WebSocketException(e.getCause());
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,20 +70,16 @@ public class WebSocketBlockingConnection
|
|||
{
|
||||
try
|
||||
{
|
||||
Future<WriteResult> blocker = conn.write(message);
|
||||
WriteResult result = blocker.get(); // block till finished
|
||||
if (result.getException() != null)
|
||||
{
|
||||
throw new WebSocketException(result.getException());
|
||||
}
|
||||
Future<Void> blocker = conn.write(message);
|
||||
blocker.get(); // block till finished
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new IOException("Blocking write failed",e);
|
||||
throw new WebSocketException("Blocking write failed",e);
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
throw new WebSocketException(e.getCause());
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,14 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.client.internal.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.ProtocolException;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
|
||||
|
@ -38,6 +39,7 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
|
|||
*/
|
||||
public class WebSocketClientConnection extends AbstractWebSocketConnection
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class);
|
||||
private final WebSocketClientFactory factory;
|
||||
private final DefaultWebSocketClient client;
|
||||
private final Masker masker;
|
||||
|
@ -86,17 +88,29 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
|
|||
connected = true;
|
||||
}
|
||||
super.onOpen();
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Overrride to set masker
|
||||
*/
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
if (frame instanceof WebSocketFrame)
|
||||
{
|
||||
if (masker != null)
|
||||
if (masker == null)
|
||||
{
|
||||
ProtocolException ex = new ProtocolException("Must set a Masker");
|
||||
LOG.warn(ex);
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(ex);
|
||||
}
|
||||
return;
|
||||
}
|
||||
masker.setMask((WebSocketFrame)frame);
|
||||
}
|
||||
return super.outgoingFrame(frame);
|
||||
super.outgoingFrame(frame,callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -28,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
|
||||
public class ClientWriteThread extends Thread
|
||||
{
|
||||
|
@ -67,7 +65,7 @@ public class ClientWriteThread extends Thread
|
|||
{
|
||||
LOG.debug("Writing {} messages to connection {}",messageCount);
|
||||
LOG.debug("Artificial Slowness {} ms",slowness);
|
||||
Future<WriteResult> lastMessage = null;
|
||||
Future<Void> lastMessage = null;
|
||||
while (m.get() < messageCount)
|
||||
{
|
||||
lastMessage = conn.write(message + "/" + m.get() + "/");
|
||||
|
@ -82,7 +80,7 @@ public class ClientWriteThread extends Thread
|
|||
// block on write of last message
|
||||
lastMessage.get(2,TimeUnit.MINUTES); // block on write
|
||||
}
|
||||
catch (InterruptedException | IOException | ExecutionException | TimeoutException e)
|
||||
catch (InterruptedException | ExecutionException | TimeoutException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.net.URI;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -49,7 +48,7 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
|
@ -62,8 +61,6 @@ import org.eclipse.jetty.websocket.common.Parser;
|
|||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
|
||||
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
|
||||
import org.eclipse.jetty.websocket.common.io.WriteResultFailedFuture;
|
||||
import org.eclipse.jetty.websocket.common.io.WriteResultFinishedFuture;
|
||||
import org.junit.Assert;
|
||||
|
||||
/**
|
||||
|
@ -211,7 +208,7 @@ public class BlockheadServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
ByteBuffer buf = generator.generate(frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -222,19 +219,23 @@ public class BlockheadServer
|
|||
try
|
||||
{
|
||||
BufferUtil.writeTo(buf,out);
|
||||
LOG.debug("flushing output");
|
||||
out.flush();
|
||||
LOG.debug("output flush complete");
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
|
||||
if (frame.getType().getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
disconnect();
|
||||
}
|
||||
return WriteResultFinishedFuture.INSTANCE;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
return new WriteResultFailedFuture(t);
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -430,7 +431,7 @@ public class BlockheadServer
|
|||
public void write(Frame frame) throws IOException
|
||||
{
|
||||
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
|
||||
outgoing.outgoingFrame(frame);
|
||||
outgoing.outgoingFrame(frame,null);
|
||||
}
|
||||
|
||||
public void write(int b) throws IOException
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
|
@ -70,7 +69,7 @@ public class SimpleEchoClient
|
|||
this.conn = conn;
|
||||
try
|
||||
{
|
||||
Future<WriteResult> fut;
|
||||
Future<Void> fut;
|
||||
fut = conn.write("Hello");
|
||||
fut.get(2,TimeUnit.SECONDS); // wait for send to complete.
|
||||
|
||||
|
|
|
@ -374,6 +374,11 @@ public class Generator
|
|||
return buffer;
|
||||
}
|
||||
|
||||
public ByteBufferPool getBufferPool()
|
||||
{
|
||||
return bufferPool;
|
||||
}
|
||||
|
||||
public boolean isRsv1InUse()
|
||||
{
|
||||
return rsv1InUse;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
|
@ -243,6 +244,72 @@ public class WebSocketFrame implements Frame
|
|||
setPayload(payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
if (this == obj)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (obj == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
WebSocketFrame other = (WebSocketFrame)obj;
|
||||
if (continuation != other.continuation)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (continuationIndex != other.continuationIndex)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (data == null)
|
||||
{
|
||||
if (other.data != null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else if (!data.equals(other.data))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (fin != other.fin)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (!Arrays.equals(mask,other.mask))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (masked != other.masked)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (opcode != other.opcode)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (rsv1 != other.rsv1)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (rsv2 != other.rsv2)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (rsv3 != other.rsv3)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of fragments this frame consists of.
|
||||
* <p>
|
||||
|
@ -329,6 +396,24 @@ public class WebSocketFrame implements Frame
|
|||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = (prime * result) + (continuation?1231:1237);
|
||||
result = (prime * result) + continuationIndex;
|
||||
result = (prime * result) + ((data == null)?0:data.hashCode());
|
||||
result = (prime * result) + (fin?1231:1237);
|
||||
result = (prime * result) + Arrays.hashCode(mask);
|
||||
result = (prime * result) + (masked?1231:1237);
|
||||
result = (prime * result) + opcode;
|
||||
result = (prime * result) + (rsv1?1231:1237);
|
||||
result = (prime * result) + (rsv2?1231:1237);
|
||||
result = (prime * result) + (rsv3?1231:1237);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPayload()
|
||||
{
|
||||
|
|
|
@ -21,16 +21,15 @@ package org.eclipse.jetty.websocket.common;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.WriteResultFailedFuture;
|
||||
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
|
||||
|
||||
/**
|
||||
* Endpoint for Writing messages to the Remote websocket.
|
||||
|
@ -51,6 +50,23 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
this.outgoing = outgoing;
|
||||
}
|
||||
|
||||
private void blockingWrite(WebSocketFrame frame) throws IOException
|
||||
{
|
||||
Future<Void> fut = sendAsyncFrame(frame);
|
||||
try
|
||||
{
|
||||
fut.get(); // block till done
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
throw new IOException("Failed to write bytes",e.getCause());
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new IOException("Failed to write bytes",e);
|
||||
}
|
||||
}
|
||||
|
||||
public InetSocketAddress getInetSocketAddress()
|
||||
{
|
||||
return connection.getRemoteAddress();
|
||||
|
@ -60,56 +76,50 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
* Internal
|
||||
*
|
||||
* @param frame
|
||||
* @return
|
||||
* the frame to write
|
||||
* @return the future for the network write of the frame
|
||||
*/
|
||||
private Future<WriteResult> sendAsyncFrame(WebSocketFrame frame)
|
||||
private Future<Void> sendAsyncFrame(WebSocketFrame frame)
|
||||
{
|
||||
FutureWriteCallback future = new FutureWriteCallback();
|
||||
try
|
||||
{
|
||||
connection.getIOState().assertOutputOpen();
|
||||
return outgoing.outgoingFrame(frame);
|
||||
outgoing.outgoingFrame(frame,future);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
return new WriteResultFailedFuture(e);
|
||||
future.writeFailed(e);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking write of bytes.
|
||||
*/
|
||||
@Override
|
||||
public void sendBytes(ByteBuffer data) throws IOException
|
||||
{
|
||||
connection.getIOState().assertOutputOpen();
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendBytes({})",BufferUtil.toDetailString(data));
|
||||
LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
|
||||
}
|
||||
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
|
||||
outgoing.outgoingFrame(frame);
|
||||
blockingWrite(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> sendBytesByFuture(ByteBuffer data)
|
||||
public Future<Void> sendBytesByFuture(ByteBuffer data)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendBytesByFuture({})",BufferUtil.toDetailString(data));
|
||||
LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
|
||||
}
|
||||
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data);
|
||||
return sendAsyncFrame(frame);
|
||||
}
|
||||
|
||||
private void sendFrame(Frame frame)
|
||||
{
|
||||
try
|
||||
{
|
||||
outgoing.outgoingFrame(frame);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
|
||||
{
|
||||
|
@ -117,8 +127,8 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
{
|
||||
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
|
||||
}
|
||||
Frame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast);
|
||||
outgoing.outgoingFrame(frame);
|
||||
WebSocketFrame frame = WebSocketFrame.binary().setPayload(fragment).setFin(isLast);
|
||||
blockingWrite(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,43 +138,51 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
{
|
||||
LOG.debug("sendPartialString({}, {})",fragment,isLast);
|
||||
}
|
||||
Frame frame = WebSocketFrame.text(fragment).setFin(isLast);
|
||||
outgoing.outgoingFrame(frame);
|
||||
WebSocketFrame frame = WebSocketFrame.text(fragment).setFin(isLast);
|
||||
blockingWrite(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPing(ByteBuffer applicationData)
|
||||
public void sendPing(ByteBuffer applicationData) throws IOException
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Ping with {}",BufferUtil.toDetailString(applicationData));
|
||||
LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData));
|
||||
}
|
||||
Frame frame = WebSocketFrame.ping().setPayload(applicationData);
|
||||
sendFrame(frame);
|
||||
WebSocketFrame frame = WebSocketFrame.ping().setPayload(applicationData);
|
||||
blockingWrite(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPong(ByteBuffer applicationData)
|
||||
public void sendPong(ByteBuffer applicationData) throws IOException
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Pong with {}",BufferUtil.toDetailString(applicationData));
|
||||
LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData));
|
||||
}
|
||||
Frame frame = WebSocketFrame.pong().setPayload(applicationData);
|
||||
sendFrame(frame);
|
||||
WebSocketFrame frame = WebSocketFrame.pong().setPayload(applicationData);
|
||||
blockingWrite(frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendString(String text) throws IOException
|
||||
{
|
||||
Frame frame = WebSocketFrame.text(text);
|
||||
outgoing.outgoingFrame(frame);
|
||||
WebSocketFrame frame = WebSocketFrame.text(text);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
|
||||
}
|
||||
blockingWrite(WebSocketFrame.text(text));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> sendStringByFuture(String text)
|
||||
public Future<Void> sendStringByFuture(String text)
|
||||
{
|
||||
WebSocketFrame frame = WebSocketFrame.text(text);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
|
||||
}
|
||||
return sendAsyncFrame(frame);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.eclipse.jetty.websocket.api.SuspendToken;
|
|||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
|
@ -394,19 +393,19 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> write(byte[] buf, int offset, int len) throws IOException
|
||||
public Future<Void> write(byte[] buf, int offset, int len)
|
||||
{
|
||||
return remote.sendBytesByFuture(ByteBuffer.wrap(buf,offset,len));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> write(ByteBuffer buffer) throws IOException
|
||||
public Future<Void> write(ByteBuffer buffer)
|
||||
{
|
||||
return remote.sendBytesByFuture(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> write(String message) throws IOException
|
||||
public Future<Void> write(String message)
|
||||
{
|
||||
return remote.sendStringByFuture(message);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.eclipse.jetty.websocket.common.extensions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
|
@ -29,7 +28,7 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Extension;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
|
@ -181,10 +180,10 @@ public abstract class AbstractExtension extends ContainerLifeCycle implements Ex
|
|||
this.nextIncoming.incomingFrame(frame);
|
||||
}
|
||||
|
||||
protected Future<WriteResult> nextOutgoingFrame(Frame frame) throws IOException
|
||||
protected void nextOutgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
log.debug("nextOutgoingFrame({})",frame);
|
||||
return this.nextOutgoing.outgoingFrame(frame);
|
||||
this.nextOutgoing.outgoingFrame(frame,callback);
|
||||
}
|
||||
|
||||
public void setBufferPool(ByteBufferPool bufferPool)
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
@ -30,7 +29,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Extension;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
|
||||
|
@ -227,9 +226,9 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
return nextOutgoing.outgoingFrame(frame);
|
||||
nextOutgoing.outgoingFrame(frame,callback);
|
||||
}
|
||||
|
||||
public void setNextIncoming(IncomingFrames nextIncoming)
|
||||
|
|
|
@ -19,11 +19,9 @@
|
|||
package org.eclipse.jetty.websocket.common.extensions.compress;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
|
@ -86,16 +84,15 @@ public class FrameCompressionExtension extends AbstractExtension
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
if (frame.getType().isControl())
|
||||
{
|
||||
// skip, cannot compress control frames.
|
||||
return nextOutgoingFrame(frame);
|
||||
nextOutgoingFrame(frame,callback);
|
||||
return;
|
||||
}
|
||||
|
||||
Future<WriteResult> future = null;
|
||||
|
||||
ByteBuffer data = frame.getPayload();
|
||||
|
||||
// deflate data
|
||||
|
@ -108,15 +105,16 @@ public class FrameCompressionExtension extends AbstractExtension
|
|||
if (!method.compress().isDone())
|
||||
{
|
||||
out.setFin(false);
|
||||
nextOutgoingFrame(frame,null); // no callback for start/end frames
|
||||
}
|
||||
else
|
||||
{
|
||||
nextOutgoingFrame(out,callback); // pass thru callback
|
||||
}
|
||||
|
||||
future = nextOutgoingFrame(out);
|
||||
}
|
||||
|
||||
// reset on every frame.
|
||||
method.compress().end();
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,13 +19,11 @@
|
|||
package org.eclipse.jetty.websocket.common.extensions.compress;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
|
@ -94,16 +92,15 @@ public class MessageCompressionExtension extends AbstractExtension
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
if (frame.getType().isControl())
|
||||
{
|
||||
// skip, cannot compress control frames.
|
||||
return nextOutgoingFrame(frame);
|
||||
nextOutgoingFrame(frame,callback);
|
||||
return;
|
||||
}
|
||||
|
||||
Future<WriteResult> future = null;
|
||||
|
||||
ByteBuffer data = frame.getPayload();
|
||||
// deflate data
|
||||
method.compress().input(data);
|
||||
|
@ -115,11 +112,13 @@ public class MessageCompressionExtension extends AbstractExtension
|
|||
if (!method.compress().isDone())
|
||||
{
|
||||
out.setFin(false);
|
||||
future = nextOutgoingFrame(out);
|
||||
// no callback for start/middle frames
|
||||
nextOutgoingFrame(out,null);
|
||||
}
|
||||
else
|
||||
{
|
||||
future = nextOutgoingFrame(out);
|
||||
// pass through callback to last frame
|
||||
nextOutgoingFrame(out,callback);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,8 +127,6 @@ public class MessageCompressionExtension extends AbstractExtension
|
|||
{
|
||||
method.compress().end();
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,12 +19,10 @@
|
|||
package org.eclipse.jetty.websocket.common.extensions.fragment;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
|
@ -53,12 +51,13 @@ public class FragmentExtension extends AbstractExtension
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
if (frame.getType().isControl())
|
||||
{
|
||||
// Cannot fragment Control Frames
|
||||
return nextOutgoingFrame(frame);
|
||||
nextOutgoingFrame(frame,callback);
|
||||
return;
|
||||
}
|
||||
|
||||
int length = frame.getPayloadLength();
|
||||
|
@ -71,7 +70,8 @@ public class FragmentExtension extends AbstractExtension
|
|||
if (maxLength <= 0)
|
||||
{
|
||||
// output original frame
|
||||
return nextOutgoingFrame(frame);
|
||||
nextOutgoingFrame(frame,callback);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean continuation = false;
|
||||
|
@ -87,7 +87,8 @@ public class FragmentExtension extends AbstractExtension
|
|||
payload.limit(Math.min(payload.position() + maxLength,originalLimit));
|
||||
frag.setPayload(payload);
|
||||
|
||||
nextOutgoingFrame(frag);
|
||||
// no callback for beginning and middle parts
|
||||
nextOutgoingFrame(frag,null);
|
||||
|
||||
length -= maxLength;
|
||||
opcode = OpCode.CONTINUATION;
|
||||
|
@ -104,7 +105,7 @@ public class FragmentExtension extends AbstractExtension
|
|||
payload.limit(originalLimit);
|
||||
frag.setPayload(payload);
|
||||
|
||||
return nextOutgoingFrame(frag);
|
||||
nextOutgoingFrame(frag,callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,13 +18,10 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.extensions.identity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.QuotedStringTokenizer;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
|
||||
|
@ -54,10 +51,10 @@ public class IdentityExtension extends AbstractExtension
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
// pass through
|
||||
return nextOutgoingFrame(frame);
|
||||
nextOutgoingFrame(frame,callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,10 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.extensions.mux;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
|
||||
|
@ -49,10 +46,9 @@ public abstract class AbstractMuxExtension extends AbstractExtension
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
/* do nothing */
|
||||
return null;
|
||||
/* do nothing here, allow Muxer to handle this aspect */
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
|
@ -32,7 +33,7 @@ import org.eclipse.jetty.websocket.api.SuspendToken;
|
|||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
|
@ -40,6 +41,7 @@ import org.eclipse.jetty.websocket.common.ConnectionState;
|
|||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
|
||||
/**
|
||||
|
@ -84,15 +86,8 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
|
|||
public void close(int statusCode, String reason)
|
||||
{
|
||||
CloseInfo close = new CloseInfo(statusCode,reason);
|
||||
try
|
||||
{
|
||||
outgoingFrame(close.asFrame());
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.warn("Unable to issue Close",e);
|
||||
disconnect();
|
||||
}
|
||||
// TODO: disconnect callback?
|
||||
outgoingFrame(close.asFrame(),null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -197,13 +192,26 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
|
|||
this.ioState.setState(ConnectionState.OPEN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal
|
||||
*
|
||||
* @param frame the frame to write
|
||||
* @return the future for the network write of the frame
|
||||
*/
|
||||
private Future<Void> outgoingAsyncFrame(WebSocketFrame frame)
|
||||
{
|
||||
FutureWriteCallback future = new FutureWriteCallback();
|
||||
outgoingFrame(frame,future);
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Frames destined for the Muxer
|
||||
*/
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
return muxer.output(channelId,frame);
|
||||
muxer.output(channelId,frame,callback);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -212,7 +220,7 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
|
|||
@Override
|
||||
public void ping(ByteBuffer buf) throws IOException
|
||||
{
|
||||
outgoingFrame(WebSocketFrame.ping().setPayload(buf));
|
||||
outgoingFrame(WebSocketFrame.ping().setPayload(buf),null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -254,26 +262,32 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
|
|||
* Generate a binary message, destined for Muxer
|
||||
*/
|
||||
@Override
|
||||
public Future<WriteResult> write(byte[] buf, int offset, int len) throws IOException
|
||||
public Future<Void> write(byte[] buf, int offset, int len)
|
||||
{
|
||||
return outgoingFrame(WebSocketFrame.binary().setPayload(buf,offset,len));
|
||||
ByteBuffer bb = ByteBuffer.wrap(buf,offset,len);
|
||||
return write(bb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a binary message, destined for Muxer
|
||||
*/
|
||||
@Override
|
||||
public Future<WriteResult> write(ByteBuffer buffer) throws IOException
|
||||
public Future<Void> write(ByteBuffer buffer)
|
||||
{
|
||||
return outgoingFrame(WebSocketFrame.binary().setPayload(buffer));
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("write with {}",BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
|
||||
return outgoingAsyncFrame(frame);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a text message, destined for Muxer
|
||||
*/
|
||||
@Override
|
||||
public Future<WriteResult> write(String message) throws IOException
|
||||
public Future<Void> write(String message)
|
||||
{
|
||||
return outgoingFrame(WebSocketFrame.text(message));
|
||||
return outgoingAsyncFrame(WebSocketFrame.text(message));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,12 +20,11 @@ package org.eclipse.jetty.websocket.common.extensions.mux;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.io.ArrayByteBufferPool;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
|
@ -56,7 +55,7 @@ public class MuxGenerator
|
|||
this.bufferPool = bufferPool;
|
||||
}
|
||||
|
||||
public Future<WriteResult> generate(long channelId, Frame frame) throws IOException
|
||||
public void generate(long channelId, Frame frame, WriteCallback callback)
|
||||
{
|
||||
ByteBuffer muxPayload = bufferPool.acquire(frame.getPayloadLength() + DATA_FRAME_OVERHEAD,false);
|
||||
BufferUtil.flipToFill(muxPayload);
|
||||
|
@ -81,10 +80,10 @@ public class MuxGenerator
|
|||
bufferPool.release(frame.getPayload());
|
||||
|
||||
// send muxed frame down to the physical connection.
|
||||
return outgoing.outgoingFrame(muxFrame);
|
||||
outgoing.outgoingFrame(muxFrame,callback);
|
||||
}
|
||||
|
||||
public void generate(MuxControlBlock... blocks) throws IOException
|
||||
public void generate(WriteCallback callback,MuxControlBlock... blocks) throws IOException
|
||||
{
|
||||
if ((blocks == null) || (blocks.length <= 0))
|
||||
{
|
||||
|
@ -167,7 +166,7 @@ public class MuxGenerator
|
|||
BufferUtil.flipToFlush(payload,0);
|
||||
WebSocketFrame frame = WebSocketFrame.binary();
|
||||
frame.setPayload(payload);
|
||||
outgoing.outgoingFrame(frame);
|
||||
outgoing.outgoingFrame(frame,callback);
|
||||
}
|
||||
|
||||
public OutgoingFrames getOutgoing()
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -34,7 +33,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
|||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
|
@ -190,7 +189,7 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
|
|||
LOG.warn(muxe);
|
||||
try
|
||||
{
|
||||
generator.generate(drop);
|
||||
generator.generate(null,drop);
|
||||
}
|
||||
catch (IOException ioe)
|
||||
{
|
||||
|
@ -384,13 +383,13 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
|
|||
/**
|
||||
* Outgoing frame, without mux encapsulated payload.
|
||||
*/
|
||||
public Future<WriteResult> output(long channelId, Frame frame) throws IOException
|
||||
public void output(long channelId, Frame frame, WriteCallback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("output({}, {})",channelId,frame);
|
||||
LOG.debug("output({}, {})",channelId,frame,callback);
|
||||
}
|
||||
return generator.generate(channelId,frame);
|
||||
generator.generate(channelId,frame,callback);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -402,7 +401,7 @@ public class Muxer implements IncomingFrames, MuxParser.Listener
|
|||
*/
|
||||
public void output(MuxControlBlock op) throws IOException
|
||||
{
|
||||
generator.generate(op);
|
||||
generator.generate(null,op);
|
||||
}
|
||||
|
||||
public void setAddClient(MuxAddClient addClient)
|
||||
|
|
|
@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
@ -36,7 +36,6 @@ import org.eclipse.jetty.io.EofException;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.ForkInvoker;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
|
@ -46,7 +45,7 @@ import org.eclipse.jetty.websocket.api.SuspendToken;
|
|||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
|
@ -62,95 +61,22 @@ import org.eclipse.jetty.websocket.common.WebSocketSession;
|
|||
*/
|
||||
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection
|
||||
{
|
||||
private abstract class AbstractFrameBytes extends FutureCallback implements FrameBytes
|
||||
{
|
||||
protected final Logger LOG;
|
||||
protected final Frame frame;
|
||||
|
||||
public AbstractFrameBytes(Frame frame)
|
||||
{
|
||||
this.frame = frame;
|
||||
this.LOG = Log.getLogger(this.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete()
|
||||
{
|
||||
if (!isDone())
|
||||
{
|
||||
AbstractWebSocketConnection.this.complete(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(Throwable t)
|
||||
{
|
||||
failed(t);
|
||||
flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for EndPoint.write failure
|
||||
*/
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
// Log failure
|
||||
if (x instanceof EofException)
|
||||
{
|
||||
// Abbreviate the EofException
|
||||
LOG.warn("failed() - " + EofException.class);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.warn("failed()",x);
|
||||
}
|
||||
flushing = false;
|
||||
queue.fail(x);
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for EndPoint.write success
|
||||
*/
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
super.succeeded();
|
||||
synchronized (queue)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Completed Write of {} ({} frame(s) in queue)",this,queue.size());
|
||||
}
|
||||
flushing = false;
|
||||
}
|
||||
complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return frame.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private class ControlFrameBytes extends AbstractFrameBytes
|
||||
private class ControlFrameBytes extends FrameBytes
|
||||
{
|
||||
private ByteBuffer buffer;
|
||||
private ByteBuffer origPayload;
|
||||
|
||||
public ControlFrameBytes(Frame frame)
|
||||
public ControlFrameBytes(Frame frame, Callback childCallback)
|
||||
{
|
||||
super(frame);
|
||||
super(frame,childCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete()
|
||||
public void completeWrite()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("complete() - frame: {}",frame);
|
||||
LOG.debug("completeWrite() - frame: {}",frame);
|
||||
}
|
||||
|
||||
if (buffer != null)
|
||||
|
@ -164,7 +90,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
buffer = null;
|
||||
}
|
||||
|
||||
super.complete();
|
||||
queue.remove(this);
|
||||
super.completeFrame();
|
||||
|
||||
if (frame.getType().getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
|
@ -179,6 +106,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(STARTED);
|
||||
if (buffer == null)
|
||||
{
|
||||
if (frame.hasPayload())
|
||||
|
@ -188,27 +118,27 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
BufferUtil.put(frame.getPayload(),origPayload);
|
||||
}
|
||||
buffer = getGenerator().generate(frame);
|
||||
|
||||
}
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
private class DataFrameBytes extends AbstractFrameBytes
|
||||
private class DataFrameBytes extends FrameBytes
|
||||
{
|
||||
private ByteBuffer buffer;
|
||||
|
||||
public DataFrameBytes(Frame frame)
|
||||
public DataFrameBytes(Frame frame, Callback childCallback)
|
||||
{
|
||||
super(frame);
|
||||
super(frame,childCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete()
|
||||
public void completeWrite()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("complete() - frame.remaining() = {}",frame.remaining());
|
||||
LOG.debug("completeWrite() - frame.remaining() = {}",frame.remaining());
|
||||
}
|
||||
|
||||
if (buffer != null)
|
||||
|
@ -228,14 +158,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
// We have written a partial frame per windowing size.
|
||||
// We need to keep the correct ordering of frames, to avoid that another
|
||||
// Data frame for the same stream is written before this one is finished.
|
||||
queue.prepend(this);
|
||||
super.completeWrite();
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Send complete");
|
||||
super.complete();
|
||||
synchronized (queue)
|
||||
{
|
||||
queue.remove(this);
|
||||
}
|
||||
// TODO: Notify the rest of the callback chain (extension, close/disconnect, and user callbacks)
|
||||
completeFrame();
|
||||
}
|
||||
flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -243,13 +177,21 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
try
|
||||
{
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(STARTED);
|
||||
int windowSize = getInputBufferSize();
|
||||
buffer = getGenerator().generate(windowSize,frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
fail(x);
|
||||
failFrame(x);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -290,13 +232,150 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
public interface FrameBytes extends Callback, Future<Void>
|
||||
public abstract class FrameBytes implements Callback
|
||||
{
|
||||
public abstract void complete();
|
||||
// no bytes have yet been flushed
|
||||
public int UNSTARTED = 0;
|
||||
// some bytes have been provided for being flushed
|
||||
public int STARTED = 1;
|
||||
// all bytes have been flushed
|
||||
public int FINISHED = 2;
|
||||
// is in failure state
|
||||
public int FAILED = 3;
|
||||
|
||||
public abstract void fail(Throwable t);
|
||||
protected final Logger LOG;
|
||||
protected final Frame frame;
|
||||
protected final Callback childCallback;
|
||||
protected final AtomicInteger state = new AtomicInteger(UNSTARTED);
|
||||
|
||||
public FrameBytes(Frame frame, Callback childCallback)
|
||||
{
|
||||
this.LOG = Log.getLogger(this.getClass());
|
||||
this.frame = frame;
|
||||
this.childCallback = childCallback;
|
||||
}
|
||||
|
||||
public void completeFrame()
|
||||
{
|
||||
LOG.debug("completeFrame() {}",this);
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(FINISHED);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Completed Write of {} ({} frame(s) in queue)",this,queue.size());
|
||||
}
|
||||
flushing = false;
|
||||
}
|
||||
AbstractWebSocketConnection.this.complete(childCallback);
|
||||
}
|
||||
|
||||
public void completeWrite()
|
||||
{
|
||||
// handle reflush.
|
||||
if (isUnfinished())
|
||||
{
|
||||
AbstractWebSocketConnection.this.complete(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
if (this == obj)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (obj == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
FrameBytes other = (FrameBytes)obj;
|
||||
if (frame == null)
|
||||
{
|
||||
if (other.frame != null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else if (!frame.equals(other.frame))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for EndPoint.write failure
|
||||
*/
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
// Log failure
|
||||
if (x instanceof EofException)
|
||||
{
|
||||
// Abbreviate the EofException
|
||||
LOG.warn("failed() - " + EofException.class);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.warn("failed()",x);
|
||||
}
|
||||
synchronized (queue)
|
||||
{
|
||||
state.set(FAILED);
|
||||
flushing = false;
|
||||
queue.fail(x);
|
||||
}
|
||||
failFrame(x);
|
||||
}
|
||||
|
||||
public void failFrame(Throwable t)
|
||||
{
|
||||
failed(t);
|
||||
flush();
|
||||
}
|
||||
|
||||
public abstract ByteBuffer getByteBuffer();
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = (prime * result) + ((frame == null)?0:frame.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the FrameBytes have been started, but not yet finished
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean isUnfinished()
|
||||
{
|
||||
return (state.get() == STARTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for EndPoint.write success
|
||||
*/
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
LOG.debug("succeeded() {}",this);
|
||||
completeWrite();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return frame.toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
|
||||
|
@ -344,6 +423,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
|
||||
public void complete(final Callback callback)
|
||||
{
|
||||
if (callback == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ioState.isOpen())
|
||||
{
|
||||
invoker.invoke(callback);
|
||||
|
@ -383,16 +466,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
private void enqueClose(int statusCode, String reason)
|
||||
{
|
||||
CloseInfo close = new CloseInfo(statusCode,reason);
|
||||
try
|
||||
{
|
||||
outgoingFrame(close.asFrame());
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.info("Unable to enque close frame",e);
|
||||
// TODO: now what?
|
||||
disconnect();
|
||||
}
|
||||
// TODO: create DisconnectCallback?
|
||||
outgoingFrame(close.asFrame(),null);
|
||||
}
|
||||
|
||||
private void execute(Runnable task)
|
||||
|
@ -430,7 +505,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
LOG.debug(".flush() - flushing={} - queue.size = {}",flushing,queue.size());
|
||||
}
|
||||
|
||||
frameBytes = queue.pop();
|
||||
frameBytes = queue.peek();
|
||||
|
||||
if (!isOpen())
|
||||
{
|
||||
|
@ -537,7 +612,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
// LOG.debug("{} onFillable()",policy.getBehavior());
|
||||
LOG.debug("{} onFillable()",policy.getBehavior());
|
||||
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
|
||||
BufferUtil.clear(buffer);
|
||||
boolean readMore = false;
|
||||
|
@ -586,31 +661,31 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Frame from API, User, or Internal implementation destined for network.
|
||||
*/
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("outgoingFrame({})",frame);
|
||||
LOG.debug("outgoingFrame({}, callback)",frame);
|
||||
}
|
||||
|
||||
Future<WriteResult> future = null;
|
||||
|
||||
synchronized (this)
|
||||
synchronized (queue)
|
||||
{
|
||||
FrameBytes bytes = null;
|
||||
Callback jettyCallback = WriteCallbackWrapper.wrap(callback);
|
||||
|
||||
if (frame.getType().isControl())
|
||||
{
|
||||
bytes = new ControlFrameBytes(frame);
|
||||
bytes = new ControlFrameBytes(frame,jettyCallback);
|
||||
}
|
||||
else
|
||||
{
|
||||
bytes = new DataFrameBytes(frame);
|
||||
bytes = new DataFrameBytes(frame,jettyCallback);
|
||||
}
|
||||
|
||||
future = new WriteResultFuture(bytes);
|
||||
|
||||
if (isOpen())
|
||||
{
|
||||
if (frame.getType().getOpCode() == OpCode.PING)
|
||||
|
@ -625,8 +700,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
|
||||
flush();
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
private int read(ByteBuffer buffer)
|
||||
|
|
|
@ -18,13 +18,8 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
|
@ -33,7 +28,6 @@ public class FramePipes
|
|||
{
|
||||
private static class In2Out implements IncomingFrames
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(FramePipes.In2Out.class);
|
||||
private OutgoingFrames outgoing;
|
||||
|
||||
public In2Out(OutgoingFrames outgoing)
|
||||
|
@ -50,14 +44,7 @@ public class FramePipes
|
|||
@Override
|
||||
public void incomingFrame(Frame frame)
|
||||
{
|
||||
try
|
||||
{
|
||||
this.outgoing.outgoingFrame(frame);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
}
|
||||
this.outgoing.outgoingFrame(frame,null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,11 +58,9 @@ public class FramePipes
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
this.incoming.incomingFrame(frame);
|
||||
|
||||
return null; // FIXME: should return completed future.
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,5 +73,4 @@ public class FramePipes
|
|||
{
|
||||
return new In2Out(outgoing);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -85,6 +85,8 @@ public class FrameQueue extends LinkedList<FrameBytes>
|
|||
bytes.failed(failure);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: make sure that we don't go in front of started but not yet finished frames.
|
||||
addFirst(bytes);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
|
||||
/**
|
||||
* Allows events to a {@link WriteCallback} to drive a {@link Future} for the user.
|
||||
*/
|
||||
public class FutureWriteCallback extends FutureCallback implements WriteCallback
|
||||
{
|
||||
@Override
|
||||
public void writeFailed(Throwable cause)
|
||||
{
|
||||
failed(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
succeeded();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,277 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
|
||||
/**
|
||||
* Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
|
||||
*/
|
||||
public class WriteBytesProvider implements Callback
|
||||
{
|
||||
private class FrameEntry
|
||||
{
|
||||
private final Logger LOG = Log.getLogger(FrameEntry.class);
|
||||
protected final Frame frame;
|
||||
protected final Callback callback;
|
||||
|
||||
public FrameEntry(Frame frame, Callback callback)
|
||||
{
|
||||
this.frame = frame;
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
ByteBuffer buffer = generator.generate(bufferSize,frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
/** The websocket generator */
|
||||
private final Generator generator;
|
||||
/** Flush callback, for notifying when a flush should be performed */
|
||||
private final Callback flushCallback;
|
||||
/** Backlog of frames */
|
||||
private LinkedList<FrameEntry> queue;
|
||||
/** the buffer input size */
|
||||
private int bufferSize = 2048;
|
||||
/** Currently active frame */
|
||||
private FrameEntry active;
|
||||
/** Failure state for the entire WriteBytesProvider */
|
||||
private Throwable failure;
|
||||
/** The last requested buffer */
|
||||
private ByteBuffer buffer;
|
||||
|
||||
/**
|
||||
* Create a WriteBytesProvider with specified Generator and "flush" Callback.
|
||||
*
|
||||
* @param generator
|
||||
* the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
|
||||
* @param flushCallback
|
||||
* the flush callback to call, on a write event, after the write event has been processed by this {@link WriteBytesProvider}.
|
||||
* <p>
|
||||
* Used to trigger another flush of the next set of bytes.
|
||||
*/
|
||||
public WriteBytesProvider(Generator generator, Callback flushCallback)
|
||||
{
|
||||
this.generator = Objects.requireNonNull(generator);
|
||||
this.flushCallback = Objects.requireNonNull(flushCallback);
|
||||
this.queue = new LinkedList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a Frame & Callback to the pending queue.
|
||||
*
|
||||
* @param frame
|
||||
* the frame to add
|
||||
* @param callback
|
||||
* the optional callback for the frame write (can be null)
|
||||
*/
|
||||
public void append(Frame frame, Callback callback)
|
||||
{
|
||||
Objects.requireNonNull(frame);
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// no changes when failed
|
||||
notifyFailure(callback);
|
||||
return;
|
||||
}
|
||||
queue.addLast(new FrameEntry(frame,callback));
|
||||
}
|
||||
}
|
||||
|
||||
public void failAll(Throwable t)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// already failed.
|
||||
return;
|
||||
}
|
||||
|
||||
failure = t;
|
||||
|
||||
for (FrameEntry fe : queue)
|
||||
{
|
||||
notifyFailure(fe.callback);
|
||||
}
|
||||
|
||||
queue.clear();
|
||||
|
||||
// notify flush callback
|
||||
flushCallback.failed(failure);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write of ByteBuffer failed.
|
||||
*
|
||||
* @param cause
|
||||
* the cause of the failure
|
||||
*/
|
||||
@Override
|
||||
public void failed(Throwable cause)
|
||||
{
|
||||
failAll(cause);
|
||||
}
|
||||
|
||||
public int getBufferSize()
|
||||
{
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next ByteBuffer to write.
|
||||
*
|
||||
* @return the next ByteBuffer (or null if nothing to write)
|
||||
*/
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (active == null)
|
||||
{
|
||||
if (queue.isEmpty())
|
||||
{
|
||||
// nothing in queue
|
||||
return null;
|
||||
}
|
||||
// get current topmost entry
|
||||
active = queue.pop();
|
||||
}
|
||||
|
||||
if (active == null)
|
||||
{
|
||||
// no active frame available, even in queue.
|
||||
return null;
|
||||
}
|
||||
|
||||
buffer = active.getByteBuffer();
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public Throwable getFailure()
|
||||
{
|
||||
return failure;
|
||||
}
|
||||
|
||||
public boolean isFailed()
|
||||
{
|
||||
return (failure != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify specific callback of failure.
|
||||
*
|
||||
* @param callback
|
||||
* the callback to notify
|
||||
*/
|
||||
private void notifyFailure(Callback callback)
|
||||
{
|
||||
if (callback == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
callback.failed(failure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepend a Frame & Callback to the pending queue.
|
||||
*
|
||||
* @param frame
|
||||
* the frame to add
|
||||
* @param callback
|
||||
* the optional callback for the frame write (can be null)
|
||||
*/
|
||||
public void prepend(Frame frame, Callback callback)
|
||||
{
|
||||
Objects.requireNonNull(frame);
|
||||
synchronized (this)
|
||||
{
|
||||
if (isFailed())
|
||||
{
|
||||
// no changes when failed
|
||||
notifyFailure(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
queue.addFirst(new FrameEntry(frame,callback));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the buffer size used for generating ByteBuffers from the frames.
|
||||
* <p>
|
||||
* Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
|
||||
*
|
||||
* @param bufferSize
|
||||
* the buffer size to use
|
||||
*/
|
||||
public void setBufferSize(int bufferSize)
|
||||
{
|
||||
this.bufferSize = bufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write of ByteBuffer succeeded.
|
||||
*/
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
// Release the active byte buffer first
|
||||
generator.getBufferPool().release(buffer);
|
||||
|
||||
if (active == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (active.frame.remaining() <= 0)
|
||||
{
|
||||
// All done with active FrameEntry
|
||||
active = null;
|
||||
}
|
||||
|
||||
// notify flush callback
|
||||
flushCallback.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
|
||||
/**
|
||||
* Wraps the exposed {@link WriteCallback} API with a Jetty {@link Callback}.
|
||||
* <p>
|
||||
* We don't expose the jetty {@link Callback} object to the webapp, as that makes things complicated for the WebAppContext's Classloader.
|
||||
*/
|
||||
public class WriteCallbackWrapper implements Callback
|
||||
{
|
||||
public static Callback wrap(WriteCallback callback)
|
||||
{
|
||||
if (callback == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
return new WriteCallbackWrapper(callback);
|
||||
}
|
||||
|
||||
private final WriteCallback callback;
|
||||
|
||||
public WriteCallbackWrapper(WriteCallback callback)
|
||||
{
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
callback.writeFailed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
|
@ -18,66 +18,15 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.FrameBytes;
|
||||
|
||||
public class WriteResultFuture implements Future<WriteResult>
|
||||
public class WriteResultFuture extends FuturePromise<WriteResult> implements Callback
|
||||
{
|
||||
private final FrameBytes bytes;
|
||||
|
||||
public WriteResultFuture(FrameBytes bytes)
|
||||
{
|
||||
this.bytes = bytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning)
|
||||
public void succeeded()
|
||||
{
|
||||
return this.bytes.cancel(mayInterruptIfRunning);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult get() throws InterruptedException, ExecutionException
|
||||
{
|
||||
try
|
||||
{
|
||||
bytes.get();
|
||||
return new WriteResult();
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
return new WriteResult(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
|
||||
{
|
||||
try
|
||||
{
|
||||
bytes.get(timeout,unit);
|
||||
return new WriteResult();
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
return new WriteResult(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled()
|
||||
{
|
||||
return this.bytes.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone()
|
||||
{
|
||||
return this.bytes.isDone();
|
||||
succeeded(new WriteResult());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package examples.echo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
|
@ -37,13 +35,6 @@ public class AnnotatedEchoSocket
|
|||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
conn.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,15 +18,12 @@
|
|||
|
||||
package examples.echo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
|
||||
/**
|
||||
* Example EchoSocket using Listener.
|
||||
|
@ -68,14 +65,6 @@ public class ListenerEchoSocket implements WebSocketListener
|
|||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
@SuppressWarnings("unused")
|
||||
Future<WriteResult> future = outbound.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.log(Level.WARNING,"unable to echo message: " + message,e);
|
||||
}
|
||||
outbound.write(message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,13 +21,11 @@ package org.eclipse.jetty.websocket.common;
|
|||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.WriteResultFinishedFuture;
|
||||
import org.junit.Assert;
|
||||
|
||||
public class OutgoingFramesCapture implements OutgoingFrames
|
||||
|
@ -84,11 +82,13 @@ public class OutgoingFramesCapture implements OutgoingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame)
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
WebSocketFrame copy = new WebSocketFrame(frame);
|
||||
frames.add(copy);
|
||||
|
||||
return WriteResultFinishedFuture.INSTANCE;
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,19 +20,16 @@ package org.eclipse.jetty.websocket.common;
|
|||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.TypeUtil;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.WriteResultFinishedFuture;
|
||||
import org.junit.Assert;
|
||||
|
||||
/**
|
||||
|
@ -63,11 +60,13 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
ByteBuffer buf = generator.generate(frame);
|
||||
captured.add(buf.slice());
|
||||
|
||||
return WriteResultFinishedFuture.INSTANCE;
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.annotations;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
|
@ -37,14 +35,7 @@ public class MyStatelessEchoSocket
|
|||
{
|
||||
@OnWebSocketMessage
|
||||
public void onText(WebSocketConnection conn, String text)
|
||||
{
|
||||
try
|
||||
{
|
||||
conn.write(text);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,15 +18,11 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.extensions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.WriteResultFinishedFuture;
|
||||
|
||||
/**
|
||||
* Dummy implementation of {@link OutgoingFrames} used for testing
|
||||
|
@ -42,10 +38,13 @@ public class DummyOutgoingFrames implements OutgoingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
LOG.debug("outgoingFrame({})",frame);
|
||||
return WriteResultFinishedFuture.INSTANCE;
|
||||
LOG.debug("outgoingFrame({},{})",frame,callback);
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -156,7 +156,7 @@ public class FragmentExtensionTest
|
|||
for (String section : quote)
|
||||
{
|
||||
Frame frame = WebSocketFrame.text(section);
|
||||
ext.outgoingFrame(frame);
|
||||
ext.outgoingFrame(frame,null);
|
||||
}
|
||||
|
||||
// Expected Frames
|
||||
|
@ -225,7 +225,7 @@ public class FragmentExtensionTest
|
|||
for (String section : quote)
|
||||
{
|
||||
Frame frame = WebSocketFrame.text(section);
|
||||
ext.outgoingFrame(frame);
|
||||
ext.outgoingFrame(frame,null);
|
||||
}
|
||||
|
||||
// Expected Frames
|
||||
|
@ -282,7 +282,7 @@ public class FragmentExtensionTest
|
|||
String payload = "Are you there?";
|
||||
Frame ping = WebSocketFrame.ping().setPayload(payload);
|
||||
|
||||
ext.outgoingFrame(ping);
|
||||
ext.outgoingFrame(ping,null);
|
||||
|
||||
capture.assertFrameCount(1);
|
||||
capture.assertHasFrame(OpCode.PING,1);
|
||||
|
|
|
@ -79,7 +79,7 @@ public class IdentityExtensionTest
|
|||
ext.setNextOutgoingFrames(capture);
|
||||
|
||||
Frame frame = WebSocketFrame.text("hello");
|
||||
ext.outgoingFrame(frame);
|
||||
ext.outgoingFrame(frame,null);
|
||||
|
||||
capture.assertFrameCount(1);
|
||||
capture.assertHasFrame(OpCode.TEXT,1);
|
||||
|
|
|
@ -110,7 +110,7 @@ public class FrameCompressionExtensionTest
|
|||
ext.setNextOutgoingFrames(capture);
|
||||
|
||||
Frame frame = WebSocketFrame.text(text);
|
||||
ext.outgoingFrame(frame);
|
||||
ext.outgoingFrame(frame,null);
|
||||
|
||||
capture.assertBytes(0,expectedHex);
|
||||
}
|
||||
|
@ -228,8 +228,8 @@ public class FrameCompressionExtensionTest
|
|||
OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator);
|
||||
ext.setNextOutgoingFrames(capture);
|
||||
|
||||
ext.outgoingFrame(WebSocketFrame.text("Hello"));
|
||||
ext.outgoingFrame(WebSocketFrame.text("There"));
|
||||
ext.outgoingFrame(WebSocketFrame.text("Hello"),null);
|
||||
ext.outgoingFrame(WebSocketFrame.text("There"),null);
|
||||
|
||||
capture.assertBytes(0,"c107f248cdc9c90700");
|
||||
capture.assertBytes(1,"c1070ac9482d4a0500");
|
||||
|
|
|
@ -288,7 +288,7 @@ public class MessageCompressionExtensionTest
|
|||
for (String section : quote)
|
||||
{
|
||||
Frame frame = WebSocketFrame.text(section);
|
||||
ext.outgoingFrame(frame);
|
||||
ext.outgoingFrame(frame,null);
|
||||
}
|
||||
|
||||
int len = quote.size();
|
||||
|
@ -340,7 +340,7 @@ public class MessageCompressionExtensionTest
|
|||
String payload = "Are you there?";
|
||||
Frame ping = WebSocketFrame.ping().setPayload(payload);
|
||||
|
||||
ext.outgoingFrame(ping);
|
||||
ext.outgoingFrame(ping,null);
|
||||
|
||||
capture.assertFrameCount(1);
|
||||
capture.assertHasFrame(OpCode.PING,1);
|
||||
|
|
|
@ -18,10 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.extensions.mux;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
|
||||
|
@ -41,9 +38,8 @@ public class MuxDecoder extends MuxEventCapture implements OutgoingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
parser.parse(frame);
|
||||
return null; // FIXME: should return completed future.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,10 @@ package org.eclipse.jetty.websocket.common.extensions.mux;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.extensions.mux.MuxControlBlock;
|
||||
import org.eclipse.jetty.websocket.common.extensions.mux.MuxGenerator;
|
||||
import org.eclipse.jetty.websocket.common.io.FramePipes;
|
||||
|
||||
/**
|
||||
|
@ -54,11 +53,12 @@ public class MuxEncoder
|
|||
|
||||
public void frame(long channelId, WebSocketFrame frame) throws IOException
|
||||
{
|
||||
this.generator.generate(channelId,frame);
|
||||
this.generator.generate(channelId,frame,null);
|
||||
}
|
||||
|
||||
public void op(MuxControlBlock op) throws IOException
|
||||
{
|
||||
this.generator.generate(op);
|
||||
WriteCallback callback = null;
|
||||
this.generator.generate(callback,op);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.api.SuspendToken;
|
|||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
|
@ -152,9 +152,8 @@ public class LocalWebSocketConnection implements WebSocketConnection, LogicalCon
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -196,19 +195,19 @@ public class LocalWebSocketConnection implements WebSocketConnection, LogicalCon
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> write(byte[] buf, int offset, int len) throws IOException
|
||||
public Future<Void> write(byte[] buf, int offset, int len)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> write(ByteBuffer buffer) throws IOException
|
||||
public Future<Void> write(ByteBuffer buffer)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> write(String message) throws IOException
|
||||
public Future<Void> write(String message)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -67,8 +67,6 @@ public class LoadTest
|
|||
|
||||
@OnWebSocketMessage
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
try
|
||||
{
|
||||
conn.write(message);
|
||||
long iter = count.incrementAndGet();
|
||||
|
@ -77,11 +75,6 @@ public class LoadTest
|
|||
LOG.info("Echo'd back {} msgs",iter);
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -150,18 +149,11 @@ public class WebSocketOverSSLTest
|
|||
|
||||
@Override
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
try
|
||||
{
|
||||
Assert.assertEquals(message,message);
|
||||
connection.write(message);
|
||||
serverLatch.countDown();
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
x.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
final CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
startClient(new WebSocketAdapter()
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server.ab;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
|
@ -52,15 +50,8 @@ public class ABSocket
|
|||
LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len);
|
||||
|
||||
// echo the message back.
|
||||
try
|
||||
{
|
||||
this.conn.write(buf,offset,len);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
|
||||
@OnWebSocketConnect
|
||||
public void onOpen(WebSocketConnection conn)
|
||||
|
@ -84,13 +75,6 @@ public class ABSocket
|
|||
}
|
||||
|
||||
// echo the message back.
|
||||
try
|
||||
{
|
||||
this.conn.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -55,7 +54,7 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
|
@ -70,7 +69,6 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
|||
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
|
||||
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.eclipse.jetty.websocket.server.helper.FinishedFuture;
|
||||
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
@ -415,24 +413,38 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
ByteBuffer buf = generator.generate(frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("writing out: {}",BufferUtil.toDetailString(buf));
|
||||
}
|
||||
try
|
||||
{
|
||||
BufferUtil.writeTo(buf,out);
|
||||
out.flush();
|
||||
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(e);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
bufferPool.release(buf);
|
||||
}
|
||||
|
||||
if (frame.getType().getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
disconnect();
|
||||
}
|
||||
|
||||
return FinishedFuture.INSTANCE;
|
||||
}
|
||||
|
||||
public int read() throws IOException
|
||||
|
@ -643,7 +655,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
|
|||
{
|
||||
frame.setMask(clientmask);
|
||||
}
|
||||
extensionStack.outgoingFrame(frame);
|
||||
extensionStack.outgoingFrame(frame,null);
|
||||
}
|
||||
|
||||
public void writeRaw(ByteBuffer buf) throws IOException
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server.browser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
|
@ -127,15 +126,8 @@ public class BrowserSocket
|
|||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
connection.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.info(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeMessage(String format, Object... args)
|
||||
{
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server.examples.echo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
|
@ -37,15 +35,8 @@ public class BigEchoSocket
|
|||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
conn.write(buf,offset,length);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@OnWebSocketMessage
|
||||
public void onText(WebSocketConnection conn, String message)
|
||||
|
@ -54,13 +45,6 @@ public class BigEchoSocket
|
|||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
conn.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server.examples.echo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
|
@ -38,17 +37,9 @@ public class EchoBroadcastSocket
|
|||
public void onBinary(byte buf[], int offset, int len)
|
||||
{
|
||||
for (EchoBroadcastSocket sock : BROADCAST)
|
||||
{
|
||||
try
|
||||
{
|
||||
sock.conn.write(buf,offset,len);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
BROADCAST.remove(sock);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OnWebSocketClose
|
||||
|
@ -68,16 +59,8 @@ public class EchoBroadcastSocket
|
|||
public void onText(String text)
|
||||
{
|
||||
for (EchoBroadcastSocket sock : BROADCAST)
|
||||
{
|
||||
try
|
||||
{
|
||||
sock.conn.write(text);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
BROADCAST.remove(sock);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server.helper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
|
@ -43,15 +41,8 @@ public class EchoSocket
|
|||
LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len);
|
||||
|
||||
// echo the message back.
|
||||
try
|
||||
{
|
||||
this.conn.write(buf,offset,len);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
|
||||
@OnWebSocketConnect
|
||||
public void onOpen(WebSocketConnection conn)
|
||||
|
@ -65,13 +56,6 @@ public class EchoSocket
|
|||
LOG.debug("onText({})",message);
|
||||
|
||||
// echo the message back.
|
||||
try
|
||||
{
|
||||
this.conn.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,13 +20,11 @@ package org.eclipse.jetty.websocket.server.helper;
|
|||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.websocket.api.WriteResult;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
|
@ -93,10 +91,13 @@ public class OutgoingFramesCapture implements OutgoingFrames
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<WriteResult> outgoingFrame(Frame frame) throws IOException
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback)
|
||||
{
|
||||
WebSocketFrame copy = new WebSocketFrame(frame);
|
||||
frames.add(copy);
|
||||
return FinishedFuture.INSTANCE;
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.server.helper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
|
@ -40,15 +38,8 @@ public class RFCSocket
|
|||
LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len);
|
||||
|
||||
// echo the message back.
|
||||
try
|
||||
{
|
||||
this.conn.write(buf,offset,len);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
|
||||
@OnWebSocketConnect
|
||||
public void onOpen(WebSocketConnection conn)
|
||||
|
@ -68,13 +59,6 @@ public class RFCSocket
|
|||
}
|
||||
|
||||
// echo the message back.
|
||||
try
|
||||
{
|
||||
this.conn.write(message);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,7 +1,9 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
# org.eclipse.jetty.websocket.LEVEL=WARN
|
||||
# org.eclipse.jetty.LEVEL=WARN
|
||||
|
||||
# org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.LEVEL=INFO
|
||||
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
||||
|
|
Loading…
Reference in New Issue