mirror of
https://github.com/jetty/jetty.project.git
synced 2025-03-01 03:19:13 +00:00
Initial work on websocket-client using SPDY as example
This commit is contained in:
parent
40faff2ab7
commit
f27fff4902
@ -15,11 +15,26 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-io</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>websocket-core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.toolchain</groupId>
|
||||
<artifactId>jetty-test-helper</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,600 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2011 Intalio, Inc.
|
||||
* ======================================================================
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Apache License v2.0 which accompanies this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* The Apache License v2.0 is available at
|
||||
* http://www.opensource.org/licenses/apache2.0.php
|
||||
*
|
||||
* You may elect to redistribute this code under either of these licenses.
|
||||
*******************************************************************************/
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.WebSocket;
|
||||
import org.eclipse.jetty.websocket.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
|
||||
import org.eclipse.jetty.websocket.WebSocket.Connection;
|
||||
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
|
||||
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
|
||||
import org.eclipse.jetty.websocket.masks.Masker;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>{@link OldWebSocketClient} allows to create multiple connections to multiple destinations
|
||||
* that can speak the websocket protocol.</p>
|
||||
* <p>When creating websocket connections, {@link OldWebSocketClient} accepts a {@link WebSocket}
|
||||
* object (to receive events from the server), and returns a {@link WebSocket.Connection} to
|
||||
* send data to the server.</p>
|
||||
* <p>Example usage is as follows:</p>
|
||||
* <pre>
|
||||
* WebSocketClientFactory factory = new WebSocketClientFactory();
|
||||
* factory.start();
|
||||
*
|
||||
* WebSocketClient client = factory.newWebSocketClient();
|
||||
* // Configure the client
|
||||
*
|
||||
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
|
||||
* {
|
||||
* public void onOpen(Connection connection)
|
||||
* {
|
||||
* // open notification
|
||||
* }
|
||||
*
|
||||
* public void onClose(int closeCode, String message)
|
||||
* {
|
||||
* // close notification
|
||||
* }
|
||||
*
|
||||
* public void onMessage(String data)
|
||||
* {
|
||||
* // handle incoming message
|
||||
* }
|
||||
* }).get(5, TimeUnit.SECONDS);
|
||||
*
|
||||
* connection.sendMessage("Hello World");
|
||||
* </pre>
|
||||
*/
|
||||
public class OldWebSocketClient
|
||||
{
|
||||
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(OldWebSocketClient.class.getName());
|
||||
|
||||
private final OldWebSocketClientFactory _factory;
|
||||
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
|
||||
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
|
||||
private String _origin;
|
||||
private String _protocol;
|
||||
private int _maxIdleTime=-1;
|
||||
private int _maxTextMessageSize=16*1024;
|
||||
private int _maxBinaryMessageSize=-1;
|
||||
private Masker _maskGen;
|
||||
private SocketAddress _bindAddress;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
|
||||
* <p>This can be wasteful of resources if many clients are created.</p>
|
||||
*
|
||||
* @deprecated Use {@link OldWebSocketClientFactory#newWebSocketClient()}
|
||||
* @throws Exception if the private WebSocketClientFactory fails to start
|
||||
*/
|
||||
@Deprecated
|
||||
public OldWebSocketClient() throws Exception
|
||||
{
|
||||
_factory=new OldWebSocketClientFactory();
|
||||
_factory.start();
|
||||
_maskGen=_factory.getMaskGen();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
|
||||
*
|
||||
* @param factory the shared {@link OldWebSocketClientFactory}
|
||||
*/
|
||||
public OldWebSocketClient(OldWebSocketClientFactory factory)
|
||||
{
|
||||
_factory=factory;
|
||||
_maskGen=_factory.getMaskGen();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The WebSocketClientFactory this client was created with.
|
||||
*/
|
||||
public OldWebSocketClientFactory getFactory()
|
||||
{
|
||||
return _factory;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the address to bind the socket channel to
|
||||
* @see #setBindAddress(SocketAddress)
|
||||
*/
|
||||
public SocketAddress getBindAddress()
|
||||
{
|
||||
return _bindAddress;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param bindAddress the address to bind the socket channel to
|
||||
* @see #getBindAddress()
|
||||
*/
|
||||
public void setBindAddress(SocketAddress bindAddress)
|
||||
{
|
||||
this._bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The maxIdleTime in ms for connections opened by this client,
|
||||
* or -1 if the default from {@link OldWebSocketClientFactory#getSelectorManager()} is used.
|
||||
* @see #setMaxIdleTime(int)
|
||||
*/
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
return _maxIdleTime;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param maxIdleTime The max idle time in ms for connections opened by this client
|
||||
* @see #getMaxIdleTime()
|
||||
*/
|
||||
public void setMaxIdleTime(int maxIdleTime)
|
||||
{
|
||||
_maxIdleTime=maxIdleTime;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The subprotocol string for connections opened by this client.
|
||||
* @see #setProtocol(String)
|
||||
*/
|
||||
public String getProtocol()
|
||||
{
|
||||
return _protocol;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param protocol The subprotocol string for connections opened by this client.
|
||||
* @see #getProtocol()
|
||||
*/
|
||||
public void setProtocol(String protocol)
|
||||
{
|
||||
_protocol = protocol;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The origin URI of the client
|
||||
* @see #setOrigin(String)
|
||||
*/
|
||||
public String getOrigin()
|
||||
{
|
||||
return _origin;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param origin The origin URI of the client (eg "http://example.com")
|
||||
* @see #getOrigin()
|
||||
*/
|
||||
public void setOrigin(String origin)
|
||||
{
|
||||
_origin = origin;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Returns the map of the cookies that are sent during the initial HTTP handshake
|
||||
* that upgrades to the websocket protocol.</p>
|
||||
* @return The read-write cookie map
|
||||
*/
|
||||
public Map<String,String> getCookies()
|
||||
{
|
||||
return _cookies;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The list of websocket protocol extensions
|
||||
*/
|
||||
public List<String> getExtensions()
|
||||
{
|
||||
return _extensions;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the mask generator to use, or null if not mask generator should be used
|
||||
* @see #setMaskGen(Masker)
|
||||
*/
|
||||
public Masker getMaskGen()
|
||||
{
|
||||
return _maskGen;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param maskGen the mask generator to use, or null if not mask generator should be used
|
||||
* @see #getMaskGen()
|
||||
*/
|
||||
public void setMaskGen(Masker maskGen)
|
||||
{
|
||||
_maskGen = maskGen;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The initial maximum text message size (in characters) for a connection
|
||||
*/
|
||||
public int getMaxTextMessageSize()
|
||||
{
|
||||
return _maxTextMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Set the initial maximum text message size for a connection. This can be changed by
|
||||
* the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
|
||||
* @param maxTextMessageSize The default maximum text message size (in characters) for a connection
|
||||
*/
|
||||
public void setMaxTextMessageSize(int maxTextMessageSize)
|
||||
{
|
||||
_maxTextMessageSize = maxTextMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The initial maximum binary message size (in bytes) for a connection
|
||||
*/
|
||||
public int getMaxBinaryMessageSize()
|
||||
{
|
||||
return _maxBinaryMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Set the initial maximum binary message size for a connection. This can be changed by
|
||||
* the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
|
||||
* @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
|
||||
*/
|
||||
public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
|
||||
{
|
||||
_maxBinaryMessageSize = maxBinaryMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
|
||||
*
|
||||
* @param uri The URI to connect to.
|
||||
* @param websocket The {@link WebSocket} instance to handle incoming events.
|
||||
* @param maxConnectTime The interval to wait for a successful connection
|
||||
* @param units the units of the maxConnectTime
|
||||
* @return A {@link WebSocket.Connection}
|
||||
* @throws IOException if the connection fails
|
||||
* @throws InterruptedException if the thread is interrupted
|
||||
* @throws TimeoutException if the timeout elapses before the connection is completed
|
||||
* @see #open(URI, WebSocket)
|
||||
*/
|
||||
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
|
||||
{
|
||||
try
|
||||
{
|
||||
return open(uri,websocket).get(maxConnectTime,units);
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof IOException)
|
||||
throw (IOException)cause;
|
||||
if (cause instanceof Error)
|
||||
throw (Error)cause;
|
||||
if (cause instanceof RuntimeException)
|
||||
throw (RuntimeException)cause;
|
||||
throw new RuntimeException(cause);
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
|
||||
* <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
|
||||
*
|
||||
* @param uri The URI to connect to.
|
||||
* @param websocket The {@link WebSocket} instance to handle incoming events.
|
||||
* @return A {@link Future} to the {@link WebSocket.Connection}
|
||||
* @throws IOException if the connection fails
|
||||
* @see #open(URI, WebSocket, long, TimeUnit)
|
||||
*/
|
||||
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
|
||||
{
|
||||
if (!_factory.isStarted())
|
||||
throw new IllegalStateException("Factory !started");
|
||||
|
||||
InetSocketAddress address = toSocketAddress(uri);
|
||||
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
if (_bindAddress != null)
|
||||
channel.socket().bind(_bindAddress);
|
||||
channel.socket().setTcpNoDelay(true);
|
||||
|
||||
WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);
|
||||
|
||||
channel.configureBlocking(false);
|
||||
channel.connect(address);
|
||||
_factory.getSelectorManager().register(channel, holder);
|
||||
|
||||
return holder;
|
||||
}
|
||||
|
||||
public static InetSocketAddress toSocketAddress(URI uri)
|
||||
{
|
||||
String scheme = uri.getScheme();
|
||||
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
|
||||
throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
|
||||
int port = uri.getPort();
|
||||
if (port == 0)
|
||||
throw new IllegalArgumentException("Bad WebSocket port: " + port);
|
||||
if (port < 0)
|
||||
port = "ws".equals(scheme) ? 80 : 443;
|
||||
|
||||
return new InetSocketAddress(uri.getHost(), port);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** The Future Websocket Connection.
|
||||
*/
|
||||
static class WebSocketFuture implements Future<WebSocket.Connection>
|
||||
{
|
||||
final WebSocket _websocket;
|
||||
final URI _uri;
|
||||
final OldWebSocketClient _client;
|
||||
final CountDownLatch _done = new CountDownLatch(1);
|
||||
ByteChannel _channel;
|
||||
WebSocketConnection _connection;
|
||||
Throwable _exception;
|
||||
|
||||
private WebSocketFuture(WebSocket websocket, URI uri, OldWebSocketClient client, ByteChannel channel)
|
||||
{
|
||||
_websocket=websocket;
|
||||
_uri=uri;
|
||||
_client=client;
|
||||
_channel=channel;
|
||||
}
|
||||
|
||||
public void onConnection(WebSocketConnection connection)
|
||||
{
|
||||
try
|
||||
{
|
||||
_client.getFactory().addConnection(connection);
|
||||
|
||||
connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
|
||||
connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
|
||||
|
||||
WebSocketConnection con;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_channel!=null)
|
||||
_connection=connection;
|
||||
con=_connection;
|
||||
}
|
||||
|
||||
if (con!=null)
|
||||
{
|
||||
if (_websocket instanceof WebSocket.OnFrame)
|
||||
((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
|
||||
|
||||
_websocket.onOpen(con.getConnection());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void handshakeFailed(Throwable ex)
|
||||
{
|
||||
try
|
||||
{
|
||||
ByteChannel channel=null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_channel!=null)
|
||||
{
|
||||
channel=_channel;
|
||||
_channel=null;
|
||||
_exception=ex;
|
||||
}
|
||||
}
|
||||
|
||||
if (channel!=null)
|
||||
{
|
||||
if (ex instanceof ProtocolException)
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
|
||||
else
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String,String> getCookies()
|
||||
{
|
||||
return _client.getCookies();
|
||||
}
|
||||
|
||||
public String getProtocol()
|
||||
{
|
||||
return _client.getProtocol();
|
||||
}
|
||||
|
||||
public WebSocket getWebSocket()
|
||||
{
|
||||
return _websocket;
|
||||
}
|
||||
|
||||
public URI getURI()
|
||||
{
|
||||
return _uri;
|
||||
}
|
||||
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
return _client.getMaxIdleTime();
|
||||
}
|
||||
|
||||
public String getOrigin()
|
||||
{
|
||||
return _client.getOrigin();
|
||||
}
|
||||
|
||||
public Masker getMaskGen()
|
||||
{
|
||||
return _client.getMaskGen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "[" + _uri + ","+_websocket+"]@"+hashCode();
|
||||
}
|
||||
|
||||
public boolean cancel(boolean mayInterruptIfRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
ByteChannel channel=null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_connection==null && _exception==null && _channel!=null)
|
||||
{
|
||||
channel=_channel;
|
||||
_channel=null;
|
||||
}
|
||||
}
|
||||
|
||||
if (channel!=null)
|
||||
{
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isCancelled()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return _channel==null && _connection==null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isDone()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return _connection!=null && _exception==null;
|
||||
}
|
||||
}
|
||||
|
||||
public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
|
||||
{
|
||||
try
|
||||
{
|
||||
return get(Long.MAX_VALUE,TimeUnit.SECONDS);
|
||||
}
|
||||
catch(TimeoutException e)
|
||||
{
|
||||
throw new IllegalStateException("The universe has ended",e);
|
||||
}
|
||||
}
|
||||
|
||||
public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
|
||||
TimeoutException
|
||||
{
|
||||
_done.await(timeout,unit);
|
||||
ByteChannel channel=null;
|
||||
org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
|
||||
Throwable exception;
|
||||
synchronized (this)
|
||||
{
|
||||
exception=_exception;
|
||||
if (_connection==null)
|
||||
{
|
||||
exception=_exception;
|
||||
channel=_channel;
|
||||
_channel=null;
|
||||
}
|
||||
else
|
||||
connection=_connection.getConnection();
|
||||
}
|
||||
|
||||
if (channel!=null)
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
|
||||
if (exception!=null)
|
||||
throw new ExecutionException(exception);
|
||||
if (connection!=null)
|
||||
return connection;
|
||||
throw new TimeoutException();
|
||||
}
|
||||
|
||||
private void closeChannel(ByteChannel channel,int code, String message)
|
||||
{
|
||||
try
|
||||
{
|
||||
_websocket.onClose(code,message);
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
__log.warn(e);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
channel.close();
|
||||
}
|
||||
catch(IOException e)
|
||||
{
|
||||
__log.debug(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,426 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2011 Intalio, Inc.
|
||||
* ======================================================================
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Apache License v2.0 which accompanies this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* The Apache License v2.0 is available at
|
||||
* http://www.opensource.org/licenses/apache2.0.php
|
||||
*
|
||||
* You may elect to redistribute this code under either of these licenses.
|
||||
*******************************************************************************/
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.ProtocolException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.sql.Connection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import javax.net.ssl.SSLEngine;
|
||||
|
||||
import org.eclipse.jetty.io.AsyncConnection;
|
||||
import org.eclipse.jetty.io.AsyncEndPoint;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.SelectChannelEndPoint;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.io.ssl.SslConnection;
|
||||
import org.eclipse.jetty.util.B64Code;
|
||||
import org.eclipse.jetty.util.QuotedStringTokenizer;
|
||||
import org.eclipse.jetty.util.component.AggregateLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||
import org.eclipse.jetty.websocket.annotations.WebSocket;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.extensions.Extension;
|
||||
import org.eclipse.jetty.websocket.masks.Masker;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>WebSocketClientFactory contains the common components needed by multiple {@link OldWebSocketClient} instances
|
||||
* (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
|
||||
* <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
|
||||
* <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
|
||||
* so it's lifecycle must be controlled externally.
|
||||
*
|
||||
* @see OldWebSocketClient
|
||||
*/
|
||||
public class OldWebSocketClientFactory extends AggregateLifeCycle
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger(OldWebSocketClientFactory.class);
|
||||
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
|
||||
private WebSocketPolicy policy;
|
||||
|
||||
/**
|
||||
* <p>Creates a WebSocketClientFactory with the default configuration.</p>
|
||||
*/
|
||||
public OldWebSocketClientFactory()
|
||||
{
|
||||
policy = WebSocketPolicy.newClientPolicy();
|
||||
}
|
||||
|
||||
public WebSocketPolicy getPolicy()
|
||||
{
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
closeConnections();
|
||||
super.doStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Creates and returns a new instance of a {@link OldWebSocketClient}, configured with this
|
||||
* WebSocketClientFactory instance.</p>
|
||||
*
|
||||
* @return a new {@link OldWebSocketClient} instance
|
||||
*/
|
||||
public OldWebSocketClient newWebSocketClient()
|
||||
{
|
||||
return new OldWebSocketClient(this);
|
||||
}
|
||||
|
||||
protected boolean addConnection(WebSocketConnection connection)
|
||||
{
|
||||
return isRunning() && connections.add(connection);
|
||||
}
|
||||
|
||||
protected boolean removeConnection(WebSocketConnection connection)
|
||||
{
|
||||
return connections.remove(connection);
|
||||
}
|
||||
|
||||
protected void closeConnections()
|
||||
{
|
||||
for (WebSocketConnection connection : connections)
|
||||
{
|
||||
try
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.info(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket Client Selector Manager
|
||||
*/
|
||||
class WebSocketClientSelector extends SelectorManager
|
||||
{
|
||||
@Override
|
||||
public boolean dispatch(Runnable task)
|
||||
{
|
||||
return _threadPool.dispatch(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
|
||||
{
|
||||
OldWebSocketClient.WebSocketFuture holder = (OldWebSocketClient.WebSocketFuture)key.attachment();
|
||||
int maxIdleTime = holder.getMaxIdleTime();
|
||||
if (maxIdleTime < 0)
|
||||
maxIdleTime = (int)getMaxIdleTime();
|
||||
SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
|
||||
AsyncEndPoint endPoint = result;
|
||||
|
||||
// Detect if it is SSL, and wrap the connection if so
|
||||
if ("wss".equals(holder.getURI().getScheme()))
|
||||
{
|
||||
SSLEngine sslEngine = newSslEngine(channel);
|
||||
SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
|
||||
endPoint.setConnection(sslConnection);
|
||||
endPoint = sslConnection.getSslEndPoint();
|
||||
}
|
||||
|
||||
AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
|
||||
endPoint.setConnection(connection);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
|
||||
{
|
||||
OldWebSocketClient.WebSocketFuture holder = (OldWebSocketClient.WebSocketFuture)attachment;
|
||||
return new HandshakeConnection(endpoint, holder);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointOpened(SelectChannelEndPoint endpoint)
|
||||
{
|
||||
// TODO expose on outer class ??
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
|
||||
{
|
||||
LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointClosed(SelectChannelEndPoint endpoint)
|
||||
{
|
||||
endpoint.getConnection().onClose();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
|
||||
{
|
||||
if (!(attachment instanceof OldWebSocketClient.WebSocketFuture))
|
||||
super.connectionFailed(channel, ex, attachment);
|
||||
else
|
||||
{
|
||||
__log.debug(ex);
|
||||
OldWebSocketClient.WebSocketFuture future = (OldWebSocketClient.WebSocketFuture)attachment;
|
||||
|
||||
future.handshakeFailed(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Handshake Connection.
|
||||
* Handles the connection until the handshake succeeds or fails.
|
||||
*/
|
||||
class HandshakeConnection extends AbstractConnection implements AsyncConnection
|
||||
{
|
||||
private final AsyncEndPoint _endp;
|
||||
private final OldWebSocketClient.WebSocketFuture _future;
|
||||
private final String _key;
|
||||
private final HttpParser _parser;
|
||||
private String _accept;
|
||||
private String _error;
|
||||
private ByteArrayBuffer _handshake;
|
||||
|
||||
public HandshakeConnection(AsyncEndPoint endpoint, OldWebSocketClient.WebSocketFuture future)
|
||||
{
|
||||
super(endpoint, System.currentTimeMillis());
|
||||
_endp = endpoint;
|
||||
_future = future;
|
||||
|
||||
byte[] bytes = new byte[16];
|
||||
new Random().nextBytes(bytes);
|
||||
_key = new String(B64Code.encode(bytes));
|
||||
|
||||
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
|
||||
_parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
|
||||
{
|
||||
@Override
|
||||
public void startResponse(ByteBuffer version, int status, ByteBuffer reason) throws IOException
|
||||
{
|
||||
if (status != 101)
|
||||
{
|
||||
_error = "Bad response status " + status + " " + reason;
|
||||
_endp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void parsedHeader(ByteBuffer name, ByteBuffer value) throws IOException
|
||||
{
|
||||
if (__ACCEPT.equals(name))
|
||||
_accept = value.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startRequest(ByteBuffer method, ByteBuffer url, ByteBuffer version) throws IOException
|
||||
{
|
||||
if (_error == null)
|
||||
_error = "Bad response: " + method + " " + url + " " + version;
|
||||
_endp.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void content(ByteBuffer ref) throws IOException
|
||||
{
|
||||
if (_error == null)
|
||||
_error = "Bad response. " + ref.length() + "B of content?";
|
||||
_endp.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean handshake()
|
||||
{
|
||||
if (_handshake==null)
|
||||
{
|
||||
String path = _future.getURI().getPath();
|
||||
if (path == null || path.length() == 0)
|
||||
path = "/";
|
||||
|
||||
if (_future.getURI().getRawQuery() != null)
|
||||
path += "?" + _future.getURI().getRawQuery();
|
||||
|
||||
String origin = _future.getOrigin();
|
||||
|
||||
StringBuilder request = new StringBuilder(512);
|
||||
request.append("GET ").append(path).append(" HTTP/1.1\r\n")
|
||||
.append("Host: ").append(_future.getURI().getHost()).append(":")
|
||||
.append(_future.getURI().getPort()).append("\r\n")
|
||||
.append("Upgrade: websocket\r\n")
|
||||
.append("Connection: Upgrade\r\n")
|
||||
.append("Sec-WebSocket-Key: ")
|
||||
.append(_key).append("\r\n");
|
||||
|
||||
if (origin != null)
|
||||
request.append("Origin: ").append(origin).append("\r\n");
|
||||
|
||||
request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
|
||||
|
||||
if (_future.getProtocol() != null)
|
||||
request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
|
||||
|
||||
Map<String, String> cookies = _future.getCookies();
|
||||
if (cookies != null && cookies.size() > 0)
|
||||
{
|
||||
for (String cookie : cookies.keySet())
|
||||
request.append("Cookie: ")
|
||||
.append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
|
||||
.append("=")
|
||||
.append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
|
||||
.append("\r\n");
|
||||
}
|
||||
|
||||
request.append("\r\n");
|
||||
|
||||
_handshake=new ByteArrayBuffer(request.toString(), false);
|
||||
}
|
||||
|
||||
// TODO extensions
|
||||
|
||||
try
|
||||
{
|
||||
int len = _handshake.length();
|
||||
int flushed = _endp.flush(_handshake);
|
||||
if (flushed<0)
|
||||
throw new IOException("incomplete handshake");
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
_future.handshakeFailed(e);
|
||||
}
|
||||
return _handshake.length()==0;
|
||||
}
|
||||
|
||||
public Connection handle() throws IOException
|
||||
{
|
||||
while (_endp.isOpen() && !_parser.isComplete())
|
||||
{
|
||||
if (_handshake==null || _handshake.length()>0)
|
||||
if (!handshake())
|
||||
return this;
|
||||
|
||||
if (!_parser.parseAvailable())
|
||||
{
|
||||
if (_endp.isInputShutdown())
|
||||
_future.handshakeFailed(new IOException("Incomplete handshake response"));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
if (_error == null)
|
||||
{
|
||||
if (_accept == null)
|
||||
{
|
||||
_error = "No Sec-WebSocket-Accept";
|
||||
}
|
||||
else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
|
||||
{
|
||||
_error = "Bad Sec-WebSocket-Accept";
|
||||
}
|
||||
else
|
||||
{
|
||||
WebSocketConnection connection = newWebSocketConnection();
|
||||
|
||||
ByteBuffer header = _parser.getHeaderBuffer();
|
||||
if (header.hasContent())
|
||||
connection.fillBuffersFrom(header);
|
||||
_buffers.returnBuffer(header);
|
||||
|
||||
_future.onConnection(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
_endp.close();
|
||||
return this;
|
||||
}
|
||||
|
||||
private WebSocketConnection newWebSocketConnection() throws IOException
|
||||
{
|
||||
return new WebSocketClientConnection(
|
||||
_future._client.getFactory(),
|
||||
_future.getWebSocket(),
|
||||
_endp,
|
||||
_buffers,
|
||||
System.currentTimeMillis(),
|
||||
_future.getMaxIdleTime(),
|
||||
_future.getProtocol(),
|
||||
null,
|
||||
WebSocketConnectionRFC6455.VERSION,
|
||||
_future.getMaskGen());
|
||||
}
|
||||
|
||||
public void onInputShutdown() throws IOException
|
||||
{
|
||||
_endp.close();
|
||||
}
|
||||
|
||||
public boolean isIdle()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isSuspended()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
if (_error != null)
|
||||
_future.handshakeFailed(new ProtocolException(_error));
|
||||
else
|
||||
_future.handshakeFailed(new EOFException());
|
||||
}
|
||||
}
|
||||
|
||||
private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
|
||||
{
|
||||
private final OldWebSocketClientFactory factory;
|
||||
|
||||
public WebSocketClientConnection(OldWebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, Masker maskGen) throws IOException
|
||||
{
|
||||
super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose()
|
||||
{
|
||||
super.onClose();
|
||||
factory.removeConnection(this);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,600 +1,137 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2011 Intalio, Inc.
|
||||
* ======================================================================
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Apache License v2.0 which accompanies this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* The Apache License v2.0 is available at
|
||||
* http://www.opensource.org/licenses/apache2.0.php
|
||||
*
|
||||
* You may elect to redistribute this code under either of these licenses.
|
||||
*******************************************************************************/
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.WebSocket;
|
||||
import org.eclipse.jetty.websocket.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
|
||||
import org.eclipse.jetty.websocket.WebSocket.Connection;
|
||||
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
|
||||
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
|
||||
import org.eclipse.jetty.websocket.masks.Masker;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketEventDriver;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
|
||||
* that can speak the websocket protocol.</p>
|
||||
* <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
|
||||
* object (to receive events from the server), and returns a {@link WebSocket.Connection} to
|
||||
* send data to the server.</p>
|
||||
* <p>Example usage is as follows:</p>
|
||||
* <pre>
|
||||
* WebSocketClientFactory factory = new WebSocketClientFactory();
|
||||
* factory.start();
|
||||
*
|
||||
* WebSocketClient client = factory.newWebSocketClient();
|
||||
* // Configure the client
|
||||
*
|
||||
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
|
||||
* {
|
||||
* public void onOpen(Connection connection)
|
||||
* {
|
||||
* // open notification
|
||||
* }
|
||||
*
|
||||
* public void onClose(int closeCode, String message)
|
||||
* {
|
||||
* // close notification
|
||||
* }
|
||||
*
|
||||
* public void onMessage(String data)
|
||||
* {
|
||||
* // handle incoming message
|
||||
* }
|
||||
* }).get(5, TimeUnit.SECONDS);
|
||||
*
|
||||
* connection.sendMessage("Hello World");
|
||||
* </pre>
|
||||
*/
|
||||
public class WebSocketClient
|
||||
{
|
||||
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
|
||||
|
||||
private final WebSocketClientFactory _factory;
|
||||
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
|
||||
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
|
||||
private String _origin;
|
||||
private String _protocol;
|
||||
private int _maxIdleTime=-1;
|
||||
private int _maxTextMessageSize=16*1024;
|
||||
private int _maxBinaryMessageSize=-1;
|
||||
private Masker _maskGen;
|
||||
private SocketAddress _bindAddress;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
|
||||
* <p>This can be wasteful of resources if many clients are created.</p>
|
||||
*
|
||||
* @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
|
||||
* @throws Exception if the private WebSocketClientFactory fails to start
|
||||
*/
|
||||
@Deprecated
|
||||
public WebSocketClient() throws Exception
|
||||
public static class ConnectFuture extends FutureCallback<WebSocketConnection>
|
||||
{
|
||||
_factory=new WebSocketClientFactory();
|
||||
_factory.start();
|
||||
_maskGen=_factory.getMaskGen();
|
||||
private final WebSocketClient client;
|
||||
private final URI websocketUri;
|
||||
private final WebSocketEventDriver websocket;
|
||||
|
||||
public ConnectFuture(WebSocketClient client, URI websocketUri, WebSocketEventDriver websocket)
|
||||
{
|
||||
this.client = client;
|
||||
this.websocketUri = websocketUri;
|
||||
this.websocket = websocket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed(WebSocketConnection context)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
super.completed(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(WebSocketConnection context, Throwable cause)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
super.failed(context,cause);
|
||||
}
|
||||
|
||||
public WebSocketClient getClient()
|
||||
{
|
||||
return client;
|
||||
}
|
||||
|
||||
public Map<String, String> getCookies()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
public WebSocketClientFactory getFactory()
|
||||
{
|
||||
return client.factory;
|
||||
}
|
||||
|
||||
public String getOrigin()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
public WebSocketEventDriver getWebSocket()
|
||||
{
|
||||
return websocket;
|
||||
}
|
||||
|
||||
public URI getWebSocketUri()
|
||||
{
|
||||
return websocketUri;
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
|
||||
*
|
||||
* @param factory the shared {@link WebSocketClientFactory}
|
||||
*/
|
||||
private final WebSocketClientFactory factory;
|
||||
|
||||
private SocketAddress bindAddress;
|
||||
private WebSocketPolicy policy;
|
||||
|
||||
public WebSocketClient(WebSocketClientFactory factory)
|
||||
{
|
||||
_factory=factory;
|
||||
_maskGen=_factory.getMaskGen();
|
||||
this.factory = factory;
|
||||
this.policy = WebSocketPolicy.newClientPolicy();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The WebSocketClientFactory this client was created with.
|
||||
*/
|
||||
public WebSocketClientFactory getFactory()
|
||||
public FutureCallback<WebSocketConnection> connect(URI websocketUri, Object websocketPojo) throws IOException
|
||||
{
|
||||
return _factory;
|
||||
if (!factory.isStarted())
|
||||
{
|
||||
throw new IllegalStateException(WebSocketClientFactory.class.getSimpleName() + " is not started");
|
||||
}
|
||||
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
if (bindAddress != null)
|
||||
{
|
||||
channel.bind(bindAddress);
|
||||
}
|
||||
channel.socket().setTcpNoDelay(true);
|
||||
channel.configureBlocking(false);
|
||||
|
||||
InetSocketAddress address = new InetSocketAddress(websocketUri.getHost(),websocketUri.getPort());
|
||||
|
||||
WebSocketEventDriver websocket = this.factory.newWebSocketDriver(websocketPojo);
|
||||
ConnectFuture result = new ConnectFuture(this,websocketUri,websocket);
|
||||
|
||||
channel.connect(address);
|
||||
factory.getSelector().connect(channel,result);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the address to bind the socket channel to
|
||||
* @see #setBindAddress(SocketAddress)
|
||||
*/
|
||||
public SocketAddress getBindAddress()
|
||||
{
|
||||
return _bindAddress;
|
||||
return bindAddress;
|
||||
}
|
||||
|
||||
public WebSocketPolicy getPolicy()
|
||||
{
|
||||
return this.policy;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param bindAddress the address to bind the socket channel to
|
||||
* @param bindAddress
|
||||
* the address to bind the socket channel to
|
||||
* @see #getBindAddress()
|
||||
*/
|
||||
public void setBindAddress(SocketAddress bindAddress)
|
||||
{
|
||||
this._bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The maxIdleTime in ms for connections opened by this client,
|
||||
* or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
|
||||
* @see #setMaxIdleTime(int)
|
||||
*/
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
return _maxIdleTime;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param maxIdleTime The max idle time in ms for connections opened by this client
|
||||
* @see #getMaxIdleTime()
|
||||
*/
|
||||
public void setMaxIdleTime(int maxIdleTime)
|
||||
{
|
||||
_maxIdleTime=maxIdleTime;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The subprotocol string for connections opened by this client.
|
||||
* @see #setProtocol(String)
|
||||
*/
|
||||
public String getProtocol()
|
||||
{
|
||||
return _protocol;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param protocol The subprotocol string for connections opened by this client.
|
||||
* @see #getProtocol()
|
||||
*/
|
||||
public void setProtocol(String protocol)
|
||||
{
|
||||
_protocol = protocol;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The origin URI of the client
|
||||
* @see #setOrigin(String)
|
||||
*/
|
||||
public String getOrigin()
|
||||
{
|
||||
return _origin;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param origin The origin URI of the client (eg "http://example.com")
|
||||
* @see #getOrigin()
|
||||
*/
|
||||
public void setOrigin(String origin)
|
||||
{
|
||||
_origin = origin;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Returns the map of the cookies that are sent during the initial HTTP handshake
|
||||
* that upgrades to the websocket protocol.</p>
|
||||
* @return The read-write cookie map
|
||||
*/
|
||||
public Map<String,String> getCookies()
|
||||
{
|
||||
return _cookies;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The list of websocket protocol extensions
|
||||
*/
|
||||
public List<String> getExtensions()
|
||||
{
|
||||
return _extensions;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the mask generator to use, or null if not mask generator should be used
|
||||
* @see #setMaskGen(Masker)
|
||||
*/
|
||||
public Masker getMaskGen()
|
||||
{
|
||||
return _maskGen;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param maskGen the mask generator to use, or null if not mask generator should be used
|
||||
* @see #getMaskGen()
|
||||
*/
|
||||
public void setMaskGen(Masker maskGen)
|
||||
{
|
||||
_maskGen = maskGen;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The initial maximum text message size (in characters) for a connection
|
||||
*/
|
||||
public int getMaxTextMessageSize()
|
||||
{
|
||||
return _maxTextMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Set the initial maximum text message size for a connection. This can be changed by
|
||||
* the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
|
||||
* @param maxTextMessageSize The default maximum text message size (in characters) for a connection
|
||||
*/
|
||||
public void setMaxTextMessageSize(int maxTextMessageSize)
|
||||
{
|
||||
_maxTextMessageSize = maxTextMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return The initial maximum binary message size (in bytes) for a connection
|
||||
*/
|
||||
public int getMaxBinaryMessageSize()
|
||||
{
|
||||
return _maxBinaryMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Set the initial maximum binary message size for a connection. This can be changed by
|
||||
* the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
|
||||
* @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
|
||||
*/
|
||||
public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
|
||||
{
|
||||
_maxBinaryMessageSize = maxBinaryMessageSize;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
|
||||
*
|
||||
* @param uri The URI to connect to.
|
||||
* @param websocket The {@link WebSocket} instance to handle incoming events.
|
||||
* @param maxConnectTime The interval to wait for a successful connection
|
||||
* @param units the units of the maxConnectTime
|
||||
* @return A {@link WebSocket.Connection}
|
||||
* @throws IOException if the connection fails
|
||||
* @throws InterruptedException if the thread is interrupted
|
||||
* @throws TimeoutException if the timeout elapses before the connection is completed
|
||||
* @see #open(URI, WebSocket)
|
||||
*/
|
||||
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
|
||||
{
|
||||
try
|
||||
{
|
||||
return open(uri,websocket).get(maxConnectTime,units);
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof IOException)
|
||||
throw (IOException)cause;
|
||||
if (cause instanceof Error)
|
||||
throw (Error)cause;
|
||||
if (cause instanceof RuntimeException)
|
||||
throw (RuntimeException)cause;
|
||||
throw new RuntimeException(cause);
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
|
||||
* <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
|
||||
*
|
||||
* @param uri The URI to connect to.
|
||||
* @param websocket The {@link WebSocket} instance to handle incoming events.
|
||||
* @return A {@link Future} to the {@link WebSocket.Connection}
|
||||
* @throws IOException if the connection fails
|
||||
* @see #open(URI, WebSocket, long, TimeUnit)
|
||||
*/
|
||||
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
|
||||
{
|
||||
if (!_factory.isStarted())
|
||||
throw new IllegalStateException("Factory !started");
|
||||
|
||||
InetSocketAddress address = toSocketAddress(uri);
|
||||
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
if (_bindAddress != null)
|
||||
channel.socket().bind(_bindAddress);
|
||||
channel.socket().setTcpNoDelay(true);
|
||||
|
||||
WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);
|
||||
|
||||
channel.configureBlocking(false);
|
||||
channel.connect(address);
|
||||
_factory.getSelectorManager().register(channel, holder);
|
||||
|
||||
return holder;
|
||||
}
|
||||
|
||||
public static InetSocketAddress toSocketAddress(URI uri)
|
||||
{
|
||||
String scheme = uri.getScheme();
|
||||
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
|
||||
throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
|
||||
int port = uri.getPort();
|
||||
if (port == 0)
|
||||
throw new IllegalArgumentException("Bad WebSocket port: " + port);
|
||||
if (port < 0)
|
||||
port = "ws".equals(scheme) ? 80 : 443;
|
||||
|
||||
return new InetSocketAddress(uri.getHost(), port);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** The Future Websocket Connection.
|
||||
*/
|
||||
static class WebSocketFuture implements Future<WebSocket.Connection>
|
||||
{
|
||||
final WebSocket _websocket;
|
||||
final URI _uri;
|
||||
final WebSocketClient _client;
|
||||
final CountDownLatch _done = new CountDownLatch(1);
|
||||
ByteChannel _channel;
|
||||
WebSocketConnection _connection;
|
||||
Throwable _exception;
|
||||
|
||||
private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
|
||||
{
|
||||
_websocket=websocket;
|
||||
_uri=uri;
|
||||
_client=client;
|
||||
_channel=channel;
|
||||
}
|
||||
|
||||
public void onConnection(WebSocketConnection connection)
|
||||
{
|
||||
try
|
||||
{
|
||||
_client.getFactory().addConnection(connection);
|
||||
|
||||
connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
|
||||
connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
|
||||
|
||||
WebSocketConnection con;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_channel!=null)
|
||||
_connection=connection;
|
||||
con=_connection;
|
||||
}
|
||||
|
||||
if (con!=null)
|
||||
{
|
||||
if (_websocket instanceof WebSocket.OnFrame)
|
||||
((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
|
||||
|
||||
_websocket.onOpen(con.getConnection());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void handshakeFailed(Throwable ex)
|
||||
{
|
||||
try
|
||||
{
|
||||
ByteChannel channel=null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_channel!=null)
|
||||
{
|
||||
channel=_channel;
|
||||
_channel=null;
|
||||
_exception=ex;
|
||||
}
|
||||
}
|
||||
|
||||
if (channel!=null)
|
||||
{
|
||||
if (ex instanceof ProtocolException)
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
|
||||
else
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String,String> getCookies()
|
||||
{
|
||||
return _client.getCookies();
|
||||
}
|
||||
|
||||
public String getProtocol()
|
||||
{
|
||||
return _client.getProtocol();
|
||||
}
|
||||
|
||||
public WebSocket getWebSocket()
|
||||
{
|
||||
return _websocket;
|
||||
}
|
||||
|
||||
public URI getURI()
|
||||
{
|
||||
return _uri;
|
||||
}
|
||||
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
return _client.getMaxIdleTime();
|
||||
}
|
||||
|
||||
public String getOrigin()
|
||||
{
|
||||
return _client.getOrigin();
|
||||
}
|
||||
|
||||
public Masker getMaskGen()
|
||||
{
|
||||
return _client.getMaskGen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "[" + _uri + ","+_websocket+"]@"+hashCode();
|
||||
}
|
||||
|
||||
public boolean cancel(boolean mayInterruptIfRunning)
|
||||
{
|
||||
try
|
||||
{
|
||||
ByteChannel channel=null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_connection==null && _exception==null && _channel!=null)
|
||||
{
|
||||
channel=_channel;
|
||||
_channel=null;
|
||||
}
|
||||
}
|
||||
|
||||
if (channel!=null)
|
||||
{
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isCancelled()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return _channel==null && _connection==null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isDone()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return _connection!=null && _exception==null;
|
||||
}
|
||||
}
|
||||
|
||||
public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
|
||||
{
|
||||
try
|
||||
{
|
||||
return get(Long.MAX_VALUE,TimeUnit.SECONDS);
|
||||
}
|
||||
catch(TimeoutException e)
|
||||
{
|
||||
throw new IllegalStateException("The universe has ended",e);
|
||||
}
|
||||
}
|
||||
|
||||
public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
|
||||
TimeoutException
|
||||
{
|
||||
_done.await(timeout,unit);
|
||||
ByteChannel channel=null;
|
||||
org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
|
||||
Throwable exception;
|
||||
synchronized (this)
|
||||
{
|
||||
exception=_exception;
|
||||
if (_connection==null)
|
||||
{
|
||||
exception=_exception;
|
||||
channel=_channel;
|
||||
_channel=null;
|
||||
}
|
||||
else
|
||||
connection=_connection.getConnection();
|
||||
}
|
||||
|
||||
if (channel!=null)
|
||||
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
|
||||
if (exception!=null)
|
||||
throw new ExecutionException(exception);
|
||||
if (connection!=null)
|
||||
return connection;
|
||||
throw new TimeoutException();
|
||||
}
|
||||
|
||||
private void closeChannel(ByteChannel channel,int code, String message)
|
||||
{
|
||||
try
|
||||
{
|
||||
_websocket.onClose(code,message);
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
__log.warn(e);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
channel.close();
|
||||
}
|
||||
catch(IOException e)
|
||||
{
|
||||
__log.debug(e);
|
||||
}
|
||||
}
|
||||
this.bindAddress = bindAddress;
|
||||
}
|
||||
}
|
||||
|
@ -1,219 +1,88 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2011 Intalio, Inc.
|
||||
* ======================================================================
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Apache License v2.0 which accompanies this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* The Apache License v2.0 is available at
|
||||
* http://www.opensource.org/licenses/apache2.0.php
|
||||
*
|
||||
* You may elect to redistribute this code under either of these licenses.
|
||||
*******************************************************************************/
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.ProtocolException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpParser;
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.AsyncEndPoint;
|
||||
|
||||
import org.eclipse.jetty.io.Buffers;
|
||||
import org.eclipse.jetty.io.ByteArrayBuffer;
|
||||
import org.eclipse.jetty.io.ConnectedEndPoint;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.SimpleBuffers;
|
||||
import org.eclipse.jetty.io.nio.AsyncConnection;
|
||||
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
|
||||
import org.eclipse.jetty.io.nio.SelectorManager;
|
||||
import org.eclipse.jetty.io.nio.SslConnection;
|
||||
import org.eclipse.jetty.util.B64Code;
|
||||
import org.eclipse.jetty.util.QuotedStringTokenizer;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.io.StandardByteBufferPool;
|
||||
import org.eclipse.jetty.util.component.AggregateLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||
import org.eclipse.jetty.websocket.WebSocket;
|
||||
import org.eclipse.jetty.websocket.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
|
||||
import org.eclipse.jetty.websocket.extensions.Extension;
|
||||
import org.eclipse.jetty.websocket.masks.Masker;
|
||||
import org.eclipse.jetty.websocket.masks.RandomMasker;
|
||||
import org.eclipse.jetty.websocket.api.EventMethodsCache;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketEventDriver;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.client.io.WebSocketClientSelectorManager;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
|
||||
* (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
|
||||
* <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
|
||||
* <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
|
||||
* so it's lifecycle must be controlled externally.
|
||||
*
|
||||
* @see WebSocketClient
|
||||
*/
|
||||
public class WebSocketClientFactory extends AggregateLifeCycle
|
||||
{
|
||||
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
|
||||
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
|
||||
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
|
||||
private final SslContextFactory _sslContextFactory = new SslContextFactory();
|
||||
private final ThreadPool _threadPool;
|
||||
private final WebSocketClientSelector _selector;
|
||||
private Masker _maskGen;
|
||||
private WebSocketBuffers _buffers;
|
||||
private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class);
|
||||
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<>();
|
||||
private final ByteBufferPool bufferPool = new StandardByteBufferPool();
|
||||
private final Executor executor;
|
||||
private final WebSocketClientSelectorManager selector;
|
||||
private final EventMethodsCache methodsCache;
|
||||
private final WebSocketPolicy policy;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClientFactory with the default configuration.</p>
|
||||
*/
|
||||
public WebSocketClientFactory()
|
||||
{
|
||||
this(null);
|
||||
this(null,null);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
|
||||
*
|
||||
* @param threadPool the ThreadPool instance to use
|
||||
*/
|
||||
public WebSocketClientFactory(ThreadPool threadPool)
|
||||
public WebSocketClientFactory(Executor threadPool)
|
||||
{
|
||||
this(threadPool, new RandomMasker());
|
||||
this(threadPool,null);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates a WebSocketClientFactory with the given ThreadPool and the given MaskGen.</p>
|
||||
*
|
||||
* @param threadPool the ThreadPool instance to use
|
||||
* @param maskGen the MaskGen instance to use
|
||||
*/
|
||||
public WebSocketClientFactory(ThreadPool threadPool, Masker maskGen)
|
||||
{
|
||||
this(threadPool, maskGen, 8192);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
||||
/**
|
||||
* <p>Creates a WebSocketClientFactory with the specified configuration.</p>
|
||||
*
|
||||
* @param threadPool the ThreadPool instance to use
|
||||
* @param maskGen the mask generator to use
|
||||
* @param bufferSize the read buffer size
|
||||
*/
|
||||
public WebSocketClientFactory(ThreadPool threadPool, Masker maskGen, int bufferSize)
|
||||
public WebSocketClientFactory(Executor threadPool, SslContextFactory sslContextFactory)
|
||||
{
|
||||
if (threadPool == null)
|
||||
{
|
||||
threadPool = new QueuedThreadPool();
|
||||
_threadPool = threadPool;
|
||||
addBean(_threadPool);
|
||||
}
|
||||
this.executor = threadPool;
|
||||
addBean(threadPool);
|
||||
|
||||
_buffers = new WebSocketBuffers(bufferSize);
|
||||
addBean(_buffers);
|
||||
if (sslContextFactory != null)
|
||||
{
|
||||
addBean(sslContextFactory);
|
||||
}
|
||||
|
||||
_maskGen = maskGen;
|
||||
addBean(_maskGen);
|
||||
selector = new WebSocketClientSelectorManager(bufferPool,executor);
|
||||
selector.setSslContextFactory(sslContextFactory);
|
||||
addBean(selector);
|
||||
|
||||
_selector = new WebSocketClientSelector();
|
||||
addBean(_selector);
|
||||
this.methodsCache = new EventMethodsCache();
|
||||
|
||||
addBean(_sslContextFactory);
|
||||
this.policy = WebSocketPolicy.newClientPolicy();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the SslContextFactory used to configure SSL parameters
|
||||
*/
|
||||
public SslContextFactory getSslContextFactory()
|
||||
public WebSocketClientFactory(SslContextFactory sslContextFactory)
|
||||
{
|
||||
return _sslContextFactory;
|
||||
this(null,sslContextFactory);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Get the selectorManager. Used to configure the manager.
|
||||
*
|
||||
* @return The {@link SelectorManager} instance.
|
||||
*/
|
||||
public SelectorManager getSelectorManager()
|
||||
private void closeConnections()
|
||||
{
|
||||
return _selector;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Get the ThreadPool.
|
||||
* Used to set/query the thread pool configuration.
|
||||
*
|
||||
* @return The {@link ThreadPool}
|
||||
*/
|
||||
public ThreadPool getThreadPool()
|
||||
{
|
||||
return _threadPool;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the shared mask generator, or null if no shared mask generator is used
|
||||
* @see WebSocketClient#getMaskGen()
|
||||
*/
|
||||
public Masker getMaskGen()
|
||||
{
|
||||
return _maskGen;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param maskGen the shared mask generator, or null if no shared mask generator is used
|
||||
* @see WebSocketClient#setMaskGen(Masker)
|
||||
*/
|
||||
public void setMaskGen(Masker maskGen)
|
||||
{
|
||||
if (isRunning())
|
||||
throw new IllegalStateException(getState());
|
||||
removeBean(_maskGen);
|
||||
_maskGen = maskGen;
|
||||
addBean(maskGen);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param bufferSize the read buffer size
|
||||
* @see #getBufferSize()
|
||||
*/
|
||||
public void setBufferSize(int bufferSize)
|
||||
{
|
||||
if (isRunning())
|
||||
throw new IllegalStateException(getState());
|
||||
removeBean(_buffers);
|
||||
_buffers = new WebSocketBuffers(bufferSize);
|
||||
addBean(_buffers);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the read buffer size
|
||||
*/
|
||||
public int getBufferSize()
|
||||
{
|
||||
return _buffers.getBufferSize();
|
||||
for (WebSocketConnection connection : connections)
|
||||
{
|
||||
try
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
connections.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -223,353 +92,38 @@ public class WebSocketClientFactory extends AggregateLifeCycle
|
||||
super.doStop();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
|
||||
* WebSocketClientFactory instance.</p>
|
||||
*
|
||||
* @return a new {@link WebSocketClient} instance
|
||||
*/
|
||||
public WebSocketClient newWebSocketClient()
|
||||
public ByteBufferPool getBufferPool()
|
||||
{
|
||||
return bufferPool;
|
||||
}
|
||||
|
||||
protected Collection<WebSocketConnection> getConnections()
|
||||
{
|
||||
return Collections.unmodifiableCollection(connections);
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
}
|
||||
|
||||
public WebSocketPolicy getPolicy()
|
||||
{
|
||||
return policy;
|
||||
}
|
||||
|
||||
public SelectorManager getSelector()
|
||||
{
|
||||
return selector;
|
||||
}
|
||||
|
||||
public WebSocketClient newSPDYClient()
|
||||
{
|
||||
return new WebSocketClient(this);
|
||||
}
|
||||
|
||||
protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
|
||||
public WebSocketEventDriver newWebSocketDriver(Object websocketPojo)
|
||||
{
|
||||
SSLEngine sslEngine;
|
||||
if (channel != null)
|
||||
{
|
||||
String peerHost = channel.socket().getInetAddress().getHostAddress();
|
||||
int peerPort = channel.socket().getPort();
|
||||
sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
|
||||
}
|
||||
else
|
||||
{
|
||||
sslEngine = _sslContextFactory.newSslEngine();
|
||||
}
|
||||
sslEngine.setUseClientMode(true);
|
||||
sslEngine.beginHandshake();
|
||||
|
||||
return sslEngine;
|
||||
}
|
||||
|
||||
protected boolean addConnection(WebSocketConnection connection)
|
||||
{
|
||||
return isRunning() && connections.add(connection);
|
||||
}
|
||||
|
||||
protected boolean removeConnection(WebSocketConnection connection)
|
||||
{
|
||||
return connections.remove(connection);
|
||||
}
|
||||
|
||||
protected void closeConnections()
|
||||
{
|
||||
for (WebSocketConnection connection : connections)
|
||||
connection.shutdown();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* WebSocket Client Selector Manager
|
||||
*/
|
||||
class WebSocketClientSelector extends SelectorManager
|
||||
{
|
||||
@Override
|
||||
public boolean dispatch(Runnable task)
|
||||
{
|
||||
return _threadPool.dispatch(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
|
||||
{
|
||||
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
|
||||
int maxIdleTime = holder.getMaxIdleTime();
|
||||
if (maxIdleTime < 0)
|
||||
maxIdleTime = (int)getMaxIdleTime();
|
||||
SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
|
||||
AsyncEndPoint endPoint = result;
|
||||
|
||||
// Detect if it is SSL, and wrap the connection if so
|
||||
if ("wss".equals(holder.getURI().getScheme()))
|
||||
{
|
||||
SSLEngine sslEngine = newSslEngine(channel);
|
||||
SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
|
||||
endPoint.setConnection(sslConnection);
|
||||
endPoint = sslConnection.getSslEndPoint();
|
||||
}
|
||||
|
||||
AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
|
||||
endPoint.setConnection(connection);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
|
||||
{
|
||||
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
|
||||
return new HandshakeConnection(endpoint, holder);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointOpened(SelectChannelEndPoint endpoint)
|
||||
{
|
||||
// TODO expose on outer class ??
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
|
||||
{
|
||||
LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointClosed(SelectChannelEndPoint endpoint)
|
||||
{
|
||||
endpoint.getConnection().onClose();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
|
||||
{
|
||||
if (!(attachment instanceof WebSocketClient.WebSocketFuture))
|
||||
super.connectionFailed(channel, ex, attachment);
|
||||
else
|
||||
{
|
||||
__log.debug(ex);
|
||||
WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
|
||||
|
||||
future.handshakeFailed(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Handshake Connection.
|
||||
* Handles the connection until the handshake succeeds or fails.
|
||||
*/
|
||||
class HandshakeConnection extends AbstractConnection implements AsyncConnection
|
||||
{
|
||||
private final AsyncEndPoint _endp;
|
||||
private final WebSocketClient.WebSocketFuture _future;
|
||||
private final String _key;
|
||||
private final HttpParser _parser;
|
||||
private String _accept;
|
||||
private String _error;
|
||||
private ByteArrayBuffer _handshake;
|
||||
|
||||
public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
|
||||
{
|
||||
super(endpoint, System.currentTimeMillis());
|
||||
_endp = endpoint;
|
||||
_future = future;
|
||||
|
||||
byte[] bytes = new byte[16];
|
||||
new Random().nextBytes(bytes);
|
||||
_key = new String(B64Code.encode(bytes));
|
||||
|
||||
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
|
||||
_parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
|
||||
{
|
||||
@Override
|
||||
public void startResponse(ByteBuffer version, int status, ByteBuffer reason) throws IOException
|
||||
{
|
||||
if (status != 101)
|
||||
{
|
||||
_error = "Bad response status " + status + " " + reason;
|
||||
_endp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void parsedHeader(ByteBuffer name, ByteBuffer value) throws IOException
|
||||
{
|
||||
if (__ACCEPT.equals(name))
|
||||
_accept = value.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startRequest(ByteBuffer method, ByteBuffer url, ByteBuffer version) throws IOException
|
||||
{
|
||||
if (_error == null)
|
||||
_error = "Bad response: " + method + " " + url + " " + version;
|
||||
_endp.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void content(ByteBuffer ref) throws IOException
|
||||
{
|
||||
if (_error == null)
|
||||
_error = "Bad response. " + ref.length() + "B of content?";
|
||||
_endp.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean handshake()
|
||||
{
|
||||
if (_handshake==null)
|
||||
{
|
||||
String path = _future.getURI().getPath();
|
||||
if (path == null || path.length() == 0)
|
||||
path = "/";
|
||||
|
||||
if (_future.getURI().getRawQuery() != null)
|
||||
path += "?" + _future.getURI().getRawQuery();
|
||||
|
||||
String origin = _future.getOrigin();
|
||||
|
||||
StringBuilder request = new StringBuilder(512);
|
||||
request.append("GET ").append(path).append(" HTTP/1.1\r\n")
|
||||
.append("Host: ").append(_future.getURI().getHost()).append(":")
|
||||
.append(_future.getURI().getPort()).append("\r\n")
|
||||
.append("Upgrade: websocket\r\n")
|
||||
.append("Connection: Upgrade\r\n")
|
||||
.append("Sec-WebSocket-Key: ")
|
||||
.append(_key).append("\r\n");
|
||||
|
||||
if (origin != null)
|
||||
request.append("Origin: ").append(origin).append("\r\n");
|
||||
|
||||
request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
|
||||
|
||||
if (_future.getProtocol() != null)
|
||||
request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
|
||||
|
||||
Map<String, String> cookies = _future.getCookies();
|
||||
if (cookies != null && cookies.size() > 0)
|
||||
{
|
||||
for (String cookie : cookies.keySet())
|
||||
request.append("Cookie: ")
|
||||
.append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
|
||||
.append("=")
|
||||
.append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
|
||||
.append("\r\n");
|
||||
}
|
||||
|
||||
request.append("\r\n");
|
||||
|
||||
_handshake=new ByteArrayBuffer(request.toString(), false);
|
||||
}
|
||||
|
||||
// TODO extensions
|
||||
|
||||
try
|
||||
{
|
||||
int len = _handshake.length();
|
||||
int flushed = _endp.flush(_handshake);
|
||||
if (flushed<0)
|
||||
throw new IOException("incomplete handshake");
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
_future.handshakeFailed(e);
|
||||
}
|
||||
return _handshake.length()==0;
|
||||
}
|
||||
|
||||
public Connection handle() throws IOException
|
||||
{
|
||||
while (_endp.isOpen() && !_parser.isComplete())
|
||||
{
|
||||
if (_handshake==null || _handshake.length()>0)
|
||||
if (!handshake())
|
||||
return this;
|
||||
|
||||
if (!_parser.parseAvailable())
|
||||
{
|
||||
if (_endp.isInputShutdown())
|
||||
_future.handshakeFailed(new IOException("Incomplete handshake response"));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
if (_error == null)
|
||||
{
|
||||
if (_accept == null)
|
||||
{
|
||||
_error = "No Sec-WebSocket-Accept";
|
||||
}
|
||||
else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
|
||||
{
|
||||
_error = "Bad Sec-WebSocket-Accept";
|
||||
}
|
||||
else
|
||||
{
|
||||
WebSocketConnection connection = newWebSocketConnection();
|
||||
|
||||
ByteBuffer header = _parser.getHeaderBuffer();
|
||||
if (header.hasContent())
|
||||
connection.fillBuffersFrom(header);
|
||||
_buffers.returnBuffer(header);
|
||||
|
||||
_future.onConnection(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
_endp.close();
|
||||
return this;
|
||||
}
|
||||
|
||||
private WebSocketConnection newWebSocketConnection() throws IOException
|
||||
{
|
||||
return new WebSocketClientConnection(
|
||||
_future._client.getFactory(),
|
||||
_future.getWebSocket(),
|
||||
_endp,
|
||||
_buffers,
|
||||
System.currentTimeMillis(),
|
||||
_future.getMaxIdleTime(),
|
||||
_future.getProtocol(),
|
||||
null,
|
||||
WebSocketConnectionRFC6455.VERSION,
|
||||
_future.getMaskGen());
|
||||
}
|
||||
|
||||
public void onInputShutdown() throws IOException
|
||||
{
|
||||
_endp.close();
|
||||
}
|
||||
|
||||
public boolean isIdle()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isSuspended()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
if (_error != null)
|
||||
_future.handshakeFailed(new ProtocolException(_error));
|
||||
else
|
||||
_future.handshakeFailed(new EOFException());
|
||||
}
|
||||
}
|
||||
|
||||
private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
|
||||
{
|
||||
private final WebSocketClientFactory factory;
|
||||
|
||||
public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, Masker maskGen) throws IOException
|
||||
{
|
||||
super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose()
|
||||
{
|
||||
super.onClose();
|
||||
factory.removeConnection(this);
|
||||
}
|
||||
return new WebSocketEventDriver(methodsCache,policy,websocketPojo);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,122 @@
|
||||
package org.eclipse.jetty.websocket.client.io;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractAsyncConnection;
|
||||
import org.eclipse.jetty.io.AsyncConnection;
|
||||
import org.eclipse.jetty.io.AsyncEndPoint;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.B64Code;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.QuotedStringTokenizer;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture;
|
||||
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
|
||||
|
||||
/**
|
||||
* Default Handshake Connection.
|
||||
* <p>
|
||||
* Results in a {@link WebSocketAsyncConnection} on successful handshake.
|
||||
*/
|
||||
public class HandshakeConnection extends AbstractAsyncConnection implements AsyncConnection
|
||||
{
|
||||
public static final String COOKIE_DELIM = "\"\\\n\r\t\f\b%+ ;=";
|
||||
private final WebSocketClient.ConnectFuture future;
|
||||
private final ByteBufferPool bufferPool;
|
||||
|
||||
private String key;
|
||||
|
||||
public HandshakeConnection(AsyncEndPoint endp, Executor executor, ByteBufferPool bufferPool, WebSocketClient.ConnectFuture future)
|
||||
{
|
||||
super(endp,executor);
|
||||
this.future = future;
|
||||
this.bufferPool = bufferPool;
|
||||
|
||||
byte[] bytes = new byte[16];
|
||||
new Random().nextBytes(bytes);
|
||||
this.key = new String(B64Code.encode(bytes));
|
||||
}
|
||||
|
||||
public void handshake() throws InterruptedException, ExecutionException
|
||||
{
|
||||
URI uri = future.getWebSocketUri();
|
||||
|
||||
StringBuilder request = new StringBuilder(512);
|
||||
request.append("GET ");
|
||||
if (StringUtil.isBlank(uri.getPath()))
|
||||
{
|
||||
request.append("/");
|
||||
}
|
||||
else
|
||||
{
|
||||
request.append(uri.getPath());
|
||||
}
|
||||
if (StringUtil.isNotBlank(uri.getRawQuery()))
|
||||
{
|
||||
request.append("?").append(uri.getRawQuery());
|
||||
}
|
||||
request.append(" HTTP/1.1\r\n");
|
||||
|
||||
request.append("Host: ").append(uri.getHost());
|
||||
if (uri.getPort() > 0)
|
||||
{
|
||||
request.append(':').append(uri.getPort());
|
||||
}
|
||||
request.append("\r\n");
|
||||
request.append("Upgrade: websocket\r\n");
|
||||
request.append("Connection: Upgrade\r\n");
|
||||
request.append("Sec-WebSocket-Key: ").append(key).append("\r\n");
|
||||
|
||||
if (StringUtil.isNotBlank(future.getOrigin()))
|
||||
{
|
||||
request.append("Origin: ").append(future.getOrigin()).append("\r\n");
|
||||
}
|
||||
|
||||
request.append("Sec-WebSocket-Version: 13\r\n"); // RFC-6455 specified version
|
||||
|
||||
Map<String, String> cookies = future.getCookies();
|
||||
if ((cookies != null) && (cookies.size() > 0))
|
||||
{
|
||||
for (String cookie : cookies.keySet())
|
||||
{
|
||||
request.append("Cookie: ");
|
||||
request.append(QuotedStringTokenizer.quoteIfNeeded(cookie,COOKIE_DELIM));
|
||||
request.append("=");
|
||||
request.append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie),COOKIE_DELIM));
|
||||
request.append("\r\n");
|
||||
}
|
||||
}
|
||||
|
||||
request.append("\r\n");
|
||||
|
||||
// TODO: extensions
|
||||
// TODO: write to connection
|
||||
byte rawreq[] = StringUtil.getUtf8Bytes(request.toString());
|
||||
ByteBuffer buf = bufferPool.acquire(rawreq.length,false);
|
||||
try
|
||||
{
|
||||
FutureCallback<ConnectFuture> callback = new FutureCallback<>();
|
||||
getEndPoint().write(future,callback,buf);
|
||||
// TODO: block on read?
|
||||
// TODO: read response & upgrade via async callback
|
||||
callback.get();
|
||||
}
|
||||
finally
|
||||
{
|
||||
bufferPool.release(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
package org.eclipse.jetty.websocket.client.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLException;
|
||||
|
||||
import org.eclipse.jetty.io.AsyncConnection;
|
||||
import org.eclipse.jetty.io.AsyncEndPoint;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.SelectChannelEndPoint;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.io.ssl.SslConnection;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketEventDriver;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
|
||||
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
|
||||
|
||||
public class WebSocketClientSelectorManager extends SelectorManager
|
||||
{
|
||||
private SslContextFactory sslContextFactory;
|
||||
private Executor executor;
|
||||
private ByteBufferPool bufferPool;
|
||||
|
||||
public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor)
|
||||
{
|
||||
super();
|
||||
this.bufferPool = bufferPool;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointClosed(AsyncEndPoint endpoint)
|
||||
{
|
||||
endpoint.getAsyncConnection().onClose();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointOpened(AsyncEndPoint endpoint)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection)
|
||||
{
|
||||
// TODO Investigate role of this with websocket
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void execute(Runnable task)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getMaxIdleTime()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
public SslContextFactory getSslContextFactory()
|
||||
{
|
||||
return sslContextFactory;
|
||||
}
|
||||
|
||||
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
|
||||
{
|
||||
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
|
||||
WebSocketClientFactory factory = confut.getFactory();
|
||||
WebSocketEventDriver websocket = confut.getWebSocket();
|
||||
|
||||
Executor executor = factory.getExecutor();
|
||||
WebSocketPolicy policy = factory.getPolicy();
|
||||
ByteBufferPool bufferPool = factory.getBufferPool();
|
||||
|
||||
WebSocketAsyncConnection connection = new WebSocketAsyncConnection(endPoint,executor,policy,bufferPool);
|
||||
endPoint.setAsyncConnection(connection);
|
||||
connection.getParser().addListener(websocket);
|
||||
|
||||
// TODO: track open websockets? bind open websocket to connection?
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
|
||||
{
|
||||
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
|
||||
|
||||
try
|
||||
{
|
||||
String scheme = confut.getWebSocketUri().getScheme();
|
||||
|
||||
if ((sslContextFactory != null) && ("wss".equalsIgnoreCase(scheme)))
|
||||
{
|
||||
final AtomicReference<AsyncEndPoint> sslEndPointRef = new AtomicReference<>();
|
||||
final AtomicReference<Object> attachmentRef = new AtomicReference<>(attachment);
|
||||
SSLEngine engine = newSSLEngine(sslContextFactory,channel);
|
||||
SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine)
|
||||
{
|
||||
@Override
|
||||
public void onClose()
|
||||
{
|
||||
sslEndPointRef.set(null);
|
||||
attachmentRef.set(null);
|
||||
super.onClose();
|
||||
}
|
||||
};
|
||||
endPoint.setAsyncConnection(sslConnection);
|
||||
AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
|
||||
sslEndPointRef.set(sslEndPoint);
|
||||
|
||||
startHandshake(engine);
|
||||
|
||||
AsyncConnection connection = newAsyncConnection(channel,sslEndPoint,attachment);
|
||||
endPoint.setAsyncConnection(connection);
|
||||
return connection;
|
||||
}
|
||||
else
|
||||
{
|
||||
AsyncConnection connection = newAsyncConnection(channel,endPoint,attachment);
|
||||
endPoint.setAsyncConnection(connection);
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.debug(t);
|
||||
confut.failed(null,t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
|
||||
{
|
||||
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,getMaxIdleTime());
|
||||
endp.setAsyncConnection(selectSet.getManager().newConnection(channel,endp,key.attachment()));
|
||||
return endp;
|
||||
}
|
||||
|
||||
public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
|
||||
{
|
||||
String peerHost = channel.socket().getInetAddress().getHostAddress();
|
||||
int peerPort = channel.socket().getPort();
|
||||
SSLEngine engine = sslContextFactory.newSslEngine(peerHost,peerPort);
|
||||
engine.setUseClientMode(true);
|
||||
return engine;
|
||||
}
|
||||
|
||||
public void setSslContextFactory(SslContextFactory sslContextFactory)
|
||||
{
|
||||
this.sslContextFactory = sslContextFactory;
|
||||
}
|
||||
|
||||
private void startHandshake(SSLEngine engine)
|
||||
{
|
||||
try
|
||||
{
|
||||
engine.beginHandshake();
|
||||
}
|
||||
catch (SSLException x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
}
|
@ -33,8 +33,8 @@ import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
|
||||
import org.eclipse.jetty.websocket.WebSocket.Connection;
|
||||
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
|
||||
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
|
||||
import org.eclipse.jetty.websocket.client.OldWebSocketClient;
|
||||
import org.eclipse.jetty.websocket.client.OldWebSocketClientFactory;
|
||||
|
||||
/**
|
||||
* This is not a general purpose websocket client.
|
||||
@ -42,7 +42,7 @@ import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
|
||||
*/
|
||||
public class TestClient implements WebSocket.OnFrame
|
||||
{
|
||||
private static WebSocketClientFactory __clientFactory = new WebSocketClientFactory();
|
||||
private static OldWebSocketClientFactory __clientFactory = new OldWebSocketClientFactory();
|
||||
private static boolean _verbose=false;
|
||||
|
||||
private static final Random __random = new Random();
|
||||
@ -141,7 +141,7 @@ public class TestClient implements WebSocket.OnFrame
|
||||
|
||||
private void open() throws Exception
|
||||
{
|
||||
WebSocketClient client = new WebSocketClient(__clientFactory);
|
||||
OldWebSocketClient client = new OldWebSocketClient(__clientFactory);
|
||||
client.setProtocol(_protocol);
|
||||
client.setMaxIdleTime(_timeout);
|
||||
client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,24 @@
|
||||
package org.eclipse.jetty.websocket.client.blockhead;
|
||||
|
||||
/**
|
||||
* A overly simplistic websocket server used during testing.
|
||||
* <p>
|
||||
* This is not meant to be performant or accurate.
|
||||
* In fact, having the server misbehave is a useful trait during testing.
|
||||
*/
|
||||
public class BlockheadServer
|
||||
{
|
||||
|
||||
public void start()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
public void close()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
package org.eclipse.jetty.websocket.server.callbacks;
|
||||
package org.eclipse.jetty.websocket.callbacks;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.websocket.server.WebSocketAsyncConnection;
|
||||
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
|
||||
|
||||
public class WebSocketCloseCallback implements Callback<Void>
|
||||
{
|
@ -1,4 +1,4 @@
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
package org.eclipse.jetty.websocket.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -21,6 +21,7 @@ import org.eclipse.jetty.websocket.api.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.callbacks.WebSocketCloseCallback;
|
||||
import org.eclipse.jetty.websocket.frames.BaseFrame;
|
||||
import org.eclipse.jetty.websocket.frames.BinaryFrame;
|
||||
import org.eclipse.jetty.websocket.frames.CloseFrame;
|
||||
@ -28,7 +29,6 @@ import org.eclipse.jetty.websocket.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.generator.FrameGenerator;
|
||||
import org.eclipse.jetty.websocket.generator.Generator;
|
||||
import org.eclipse.jetty.websocket.parser.Parser;
|
||||
import org.eclipse.jetty.websocket.server.callbacks.WebSocketCloseCallback;
|
||||
|
||||
/**
|
||||
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link AsyncConnection} framework of jetty-io
|
@ -43,6 +43,7 @@ import org.eclipse.jetty.websocket.extensions.Extension;
|
||||
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
|
||||
import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension;
|
||||
import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension;
|
||||
import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
|
||||
import org.eclipse.jetty.websocket.server.handshake.HandshakeHixie76;
|
||||
import org.eclipse.jetty.websocket.server.handshake.HandshakeRFC6455;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user