JSR-356 first working annotated @ClientEndpoint echo test

This commit is contained in:
Joakim Erdfelt 2013-03-28 10:51:45 -07:00
parent 52d97d8a06
commit 3a66b3ec3f
28 changed files with 730 additions and 242 deletions

View File

@ -38,10 +38,13 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.jsr356.endpoints.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrClientEndpointImpl;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrEndpointImpl;
import org.eclipse.jetty.websocket.jsr356.endpoints.JsrEventDriverFactory;
/**
* Main WebSocketContainer for working with client based WebSocket Endpoints.
*/
public class JettyWebSocketContainer implements WebSocketContainer
{
private static final Logger LOG = Log.getLogger(JettyWebSocketContainer.class);
@ -51,8 +54,7 @@ public class JettyWebSocketContainer implements WebSocketContainer
public JettyWebSocketContainer()
{
client = new WebSocketClient();
client.getEventDriverFactory().addImplementation(new JsrEndpointImpl(this));
client.getEventDriverFactory().addImplementation(new JsrClientEndpointImpl(this));
client.setEventDriverFactory(new JsrEventDriverFactory(client.getPolicy(),this));
try
{
@ -83,7 +85,7 @@ public class JettyWebSocketContainer implements WebSocketContainer
Future<org.eclipse.jetty.websocket.api.Session> futSess = client.connect(endpoint,path,req);
try
{
org.eclipse.jetty.websocket.api.Session sess = futSess.get();
WebSocketSession sess = (WebSocketSession)futSess.get();
return new JsrSession(this,sess,getNextId());
}
catch (InterruptedException | ExecutionException e)
@ -175,6 +177,12 @@ public class JettyWebSocketContainer implements WebSocketContainer
return String.format("websocket-%d",idgen.incrementAndGet());
}
public Set<Session> getOpenSessions()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void setAsyncSendTimeout(long timeoutmillis)
{

View File

@ -26,13 +26,20 @@ import java.nio.ByteBuffer;
import javax.websocket.EncodeException;
import javax.websocket.RemoteEndpoint;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.message.MessageOutputStream;
import org.eclipse.jetty.websocket.common.message.MessageWriter;
public class JsrBasicRemote implements RemoteEndpoint.Basic
{
private final WebSocketSession jettySession;
private final org.eclipse.jetty.websocket.api.RemoteEndpoint jettyRemote;
private boolean batchingAllowed = false;
protected JsrBasicRemote(org.eclipse.jetty.websocket.api.RemoteEndpoint endpoint)
protected JsrBasicRemote(WebSocketSession session)
{
this.jettyRemote = endpoint;
this.jettySession = session;
this.jettyRemote = jettySession.getRemote();
}
@Override
@ -44,29 +51,25 @@ public class JsrBasicRemote implements RemoteEndpoint.Basic
@Override
public boolean getBatchingAllowed()
{
// TODO Auto-generated method stub
return false;
return batchingAllowed;
}
@Override
public OutputStream getSendStream() throws IOException
{
// TODO Auto-generated method stub
return null;
return new MessageOutputStream(jettySession);
}
@Override
public Writer getSendWriter() throws IOException
{
// TODO Auto-generated method stub
return null;
return new MessageWriter(jettySession);
}
@Override
public void sendBinary(ByteBuffer data) throws IOException
{
// TODO Auto-generated method stub
jettyRemote.sendBytes(data);
}
@Override
@ -78,8 +81,7 @@ public class JsrBasicRemote implements RemoteEndpoint.Basic
@Override
public void sendObject(Object o) throws IOException, EncodeException
{
// TODO Auto-generated method stub
// TODO Find appropriate Encoder and encode for output
}
@Override
@ -97,8 +99,7 @@ public class JsrBasicRemote implements RemoteEndpoint.Basic
@Override
public void sendText(String text) throws IOException
{
// TODO Auto-generated method stub
jettyRemote.sendString(text);
}
@Override
@ -110,7 +111,6 @@ public class JsrBasicRemote implements RemoteEndpoint.Basic
@Override
public void setBatchingAllowed(boolean allowed)
{
// TODO Auto-generated method stub
this.batchingAllowed = allowed;
}
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -35,19 +37,23 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.WebSocketSession;
public class JsrSession implements Session
{
private final JettyWebSocketContainer container;
/** Jetty API Session Impl */
private final org.eclipse.jetty.websocket.api.Session jettySession;
private final WebSocketSession jettySession;
private final String id;
private List<Extension> negotiatedExtensions;
private Map<String, List<String>> jsrParameterMap;
private Map<String, String> pathParameters = new HashMap<>();
private Map<String, Object> userProperties;
private Set<MessageHandler> messageHandlers;
private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote;
public JsrSession(JettyWebSocketContainer container, org.eclipse.jetty.websocket.api.Session session, String id)
public JsrSession(JettyWebSocketContainer container, WebSocketSession session, String id)
{
this.container = container;
this.jettySession = session;
@ -57,7 +63,7 @@ public class JsrSession implements Session
@Override
public void addMessageHandler(MessageHandler listener) throws IllegalStateException
{
// TODO Auto-generated method stub
messageHandlers.add(listener);
}
@Override
@ -87,7 +93,7 @@ public class JsrSession implements Session
{
if (basicRemote == null)
{
basicRemote = new JsrBasicRemote(jettySession.getRemote());
basicRemote = new JsrBasicRemote(jettySession);
}
return basicRemote;
}
@ -113,8 +119,7 @@ public class JsrSession implements Session
@Override
public long getMaxIdleTimeout()
{
// TODO Auto-generated method stub
return 0;
return jettySession.getPolicy().getIdleTimeout();
}
@Override
@ -126,8 +131,7 @@ public class JsrSession implements Session
@Override
public Set<MessageHandler> getMessageHandlers()
{
// TODO Auto-generated method stub
return null;
return messageHandlers;
}
@Override
@ -153,15 +157,13 @@ public class JsrSession implements Session
@Override
public Set<Session> getOpenSessions()
{
// TODO Auto-generated method stub
return null;
return container.getOpenSessions();
}
@Override
public Map<String, String> getPathParameters()
{
// TODO Auto-generated method stub
return null;
return Collections.unmodifiableMap(pathParameters);
}
@Override
@ -188,11 +190,6 @@ public class JsrSession implements Session
return jettySession.getUpgradeRequest().getRequestURI();
}
public long getTimeout()
{
return jettySession.getIdleTimeout();
}
@Override
public Principal getUserPrincipal()
{
@ -203,8 +200,7 @@ public class JsrSession implements Session
@Override
public Map<String, Object> getUserProperties()
{
// TODO Auto-generated method stub
return null;
return userProperties;
}
@Override
@ -222,33 +218,24 @@ public class JsrSession implements Session
@Override
public void removeMessageHandler(MessageHandler handler)
{
// TODO Auto-generated method stub
messageHandlers.remove(handler);
}
@Override
public void setMaxBinaryMessageBufferSize(int length)
{
// TODO Auto-generated method stub
jettySession.getPolicy().setMaxBinaryMessageBufferSize(length);
}
@Override
public void setMaxIdleTimeout(long milliseconds)
{
// TODO Auto-generated method stub
jettySession.getPolicy().setIdleTimeout(milliseconds);
}
@Override
public void setMaxTextMessageBufferSize(int length)
{
// TODO Auto-generated method stub
}
public void setTimeout(long milliseconds)
{
jettySession.setIdleTimeout(milliseconds);
jettySession.getPolicy().setMaxTextMessageBufferSize(length);
}
}

View File

@ -125,25 +125,34 @@ public class AnnotatedEndpointScanner extends AbstractMethodAnnotationScanner<Js
{
assertIsPublicNonStatic(method);
assertIsReturn(method,Void.TYPE);
// ParameterizedMethod msgMethod = establishCallable(metadata,pojo,method,paramsOnMessage,OnMessage.class);
// switch (msgMethod.getMessageType())
// {
// case TEXT:
// metadata.onText = msgMethod;
// break;
// case BINARY:
// metadata.onBinary = msgMethod;
// break;
// case PONG:
// metadata.onPong = msgMethod;
// break;
// default:
// StringBuilder err = new StringBuilder();
// err.append("Invalid @OnMessage method signature,");
// err.append(" Missing type TEXT, BINARY, or PONG parameter: ");
// err.append(msgMethod.getFullyQualifiedMethodName());
// throw new InvalidSignatureException(err.toString());
// }
OnMessageCallable onmessage = new OnMessageCallable(pojo,method);
visitMethod(onmessage,pojo,method,paramsOnMessage,OnMessage.class);
Param param = onmessage.getMessageObjectParam();
switch (param.role)
{
case MESSAGE_BINARY:
metadata.onBinary = new OnMessageBinaryCallable(onmessage);
break;
case MESSAGE_BINARY_STREAM:
metadata.onBinaryStream = new OnMessageBinaryStreamCallable(onmessage);
break;
case MESSAGE_TEXT:
metadata.onText = new OnMessageTextCallable(onmessage);
break;
case MESSAGE_TEXT_STREAM:
metadata.onTextStream = new OnMessageTextStreamCallable(onmessage);
break;
case MESSAGE_PONG:
metadata.onPong = new OnMessagePongCallable(onmessage);
break;
default:
StringBuilder err = new StringBuilder();
err.append("An unrecognized message type <");
err.append(param.type);
err.append(">: does not meet specified type categories of [TEXT, BINARY, DECODER, or PONG]");
throw new InvalidSignatureException(err.toString());
}
}
}
@ -181,6 +190,7 @@ public class AnnotatedEndpointScanner extends AbstractMethodAnnotationScanner<Js
if (paramId.process(param,callable))
{
// Successfully identified
LOG.debug("Identified: {}",param);
return true;
}
}

View File

@ -34,7 +34,7 @@ public abstract class JsrCallable extends CallableMethod
protected final Object[] args;
protected int idxSession = -1;
// Optional decoder (used for OnMessage)
private Decoder decoder;
protected Decoder decoder;
public JsrCallable(Class<?> pojo, Method method)
{
@ -51,12 +51,16 @@ public abstract class JsrCallable extends CallableMethod
args = new Object[len];
}
protected void copyTo(JsrCallable copy)
/**
* Copy Constructor
*/
public JsrCallable(JsrCallable copy)
{
copy.decoder = this.decoder;
copy.idxSession = this.idxSession;
System.arraycopy(this.params,0,copy.params,0,params.length);
System.arraycopy(this.args,0,copy.args,0,args.length);
this(copy.getPojo(),copy.getMethod());
this.decoder = copy.decoder;
this.idxSession = copy.idxSession;
System.arraycopy(copy.params,0,this.params,0,params.length);
System.arraycopy(copy.args,0,this.args,0,args.length);
}
/**
@ -105,7 +109,7 @@ public abstract class JsrCallable extends CallableMethod
return params;
}
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
// Default the session.
// Session is an optional parameter (always)
@ -117,7 +121,8 @@ public abstract class JsrCallable extends CallableMethod
// Default the path parameters
// PathParam's are optional parameters (always)
if (pathParams != null)
Map<String, String> pathParams = session.getPathParameters();
if ((pathParams != null) && (pathParams.size() > 0))
{
for (Param param : params)
{

View File

@ -0,0 +1,235 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.annotations;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.DecodeException;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.common.CloseInfo;
/**
* The live event methods found for a specific Annotated Endpoint
*/
public class JsrEvents
{
private final JsrMetadata<?> metadata;
/**
* Callable for &#064;{@link OnOpen} annotation.
*/
private final OnOpenCallable onOpen;
/**
* Callable for &#064;{@link OnClose} annotation
*/
private final OnCloseCallable onClose;
/**
* Callable for &#064;{@link OnError} annotation
*/
private final OnErrorCallable onError;
/**
* Callable for &#064;{@link OnMessage} annotation dealing with Text Message Format
*/
private final OnMessageTextCallable onText;
/**
* Callable for &#064;{@link OnMessage} annotation dealing with Text Streaming Message Format
*/
private final OnMessageTextStreamCallable onTextStream;
/**
* Callable for &#064;{@link OnMessage} annotation dealing with Binary Message Format
*/
private final OnMessageBinaryCallable onBinary;
/**
* Callable for &#064;{@link OnMessage} annotation dealing with Binary Streaming Message Format
*/
private final OnMessageBinaryStreamCallable onBinaryStream;
/**
* Callable for &#064;{@link OnMessage} annotation dealing with Pong Message Format
*/
private OnMessagePongCallable onPong;
public JsrEvents(JsrMetadata<?> metadata)
{
this.metadata = metadata;
this.onOpen = (metadata.onOpen == null)?null:new OnOpenCallable(metadata.onOpen);
this.onClose = (metadata.onClose == null)?null:new OnCloseCallable(metadata.onClose);
this.onError = (metadata.onError == null)?null:new OnErrorCallable(metadata.onError);
this.onBinary = (metadata.onBinary == null)?null:new OnMessageBinaryCallable(metadata.onBinary);
this.onBinaryStream = (metadata.onBinaryStream == null)?null:new OnMessageBinaryStreamCallable(metadata.onBinaryStream);
this.onText = (metadata.onText == null)?null:new OnMessageTextCallable(metadata.onText);
this.onTextStream = (metadata.onTextStream == null)?null:new OnMessageTextStreamCallable(metadata.onTextStream);
this.onPong = (metadata.onPong == null)?null:new OnMessagePongCallable(metadata.onPong);
}
public void callBinary(Object websocket, ByteBuffer buf, boolean fin) throws DecodeException
{
if (onBinary == null)
{
return;
}
onBinary.call(websocket,buf,fin);
}
public void callBinaryStream(Object websocket, InputStream stream) throws DecodeException, IOException
{
if (onBinaryStream == null)
{
return;
}
onBinaryStream.call(websocket,stream);
}
public void callClose(Object websocket, CloseInfo close)
{
if (onClose == null)
{
return;
}
onClose.call(websocket,close);
}
public void callError(Object websocket, Throwable cause)
{
if (onError == null)
{
return;
}
onError.call(websocket,cause);
}
public void callOpen(Object websocket, EndpointConfig config)
{
if (onOpen == null)
{
return;
}
onOpen.call(websocket,config);
}
public void callText(Object websocket, String text, boolean fin) throws DecodeException
{
if (onText == null)
{
return;
}
onText.call(websocket,text,fin);
}
public void callTextStream(Object websocket, Reader reader) throws DecodeException, IOException
{
if (onTextStream == null)
{
return;
}
onTextStream.call(websocket,reader);
}
public boolean hasBinary()
{
return (onBinary != null);
}
public boolean hasBinaryStream()
{
return (onBinaryStream != null);
}
public boolean hasText()
{
return (onText != null);
}
public boolean hasTextStream()
{
return (onTextStream != null);
}
public void init(Session session)
{
Map<String, String> pathParams = session.getPathParameters();
if (onOpen != null)
{
onOpen.init(session);
}
if (onClose != null)
{
onClose.init(session);
}
if (onError != null)
{
onError.init(session);
}
if (onText != null)
{
onText.init(session);
}
if (onTextStream != null)
{
onTextStream.init(session);
}
if (onBinary != null)
{
onBinary.init(session);
}
if (onBinaryStream != null)
{
onBinaryStream.init(session);
}
if (onPong != null)
{
onPong.init(session);
}
}
public boolean isBinaryPartialSupported()
{
if (onBinary == null)
{
return false;
}
return onBinary.isPartialMessageSupported();
}
public boolean isTextPartialSupported()
{
if (onText == null)
{
return false;
}
return onText.isPartialMessageSupported();
}
}

View File

@ -29,6 +29,12 @@ import javax.websocket.OnOpen;
import org.eclipse.jetty.websocket.jsr356.decoders.Decoders;
import org.eclipse.jetty.websocket.jsr356.encoders.Encoders;
/**
* Static reference to a specific annotated classes metadata.
*
* @param <T>
* the annotation this metadata is based off of
*/
public abstract class JsrMetadata<T extends Annotation>
{
/**

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.jsr356.annotations;
import java.lang.reflect.Method;
import java.util.Map;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
@ -41,6 +40,12 @@ public class OnCloseCallable extends JsrCallable
super(pojo,method);
}
public OnCloseCallable(OnCloseCallable copy)
{
super(copy);
this.idxCloseReason = copy.idxCloseReason;
}
public void call(Object endpoint, CloseInfo close)
{
this.call(endpoint,close.getStatusCode(),close.getReason());
@ -58,18 +63,10 @@ public class OnCloseCallable extends JsrCallable
super.call(endpoint,super.args);
}
public OnCloseCallable copy()
{
OnCloseCallable copy = new OnCloseCallable(pojo,method);
super.copyTo(copy);
copy.idxCloseReason = this.idxCloseReason;
return copy;
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxCloseReason = findIndexForRole(Role.CLOSE_REASON);
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.jsr356.annotations;
import java.lang.reflect.Method;
import java.util.Map;
import javax.websocket.OnError;
import javax.websocket.Session;
@ -36,6 +35,12 @@ public class OnErrorCallable extends JsrCallable
super(pojo,method);
}
public OnErrorCallable(OnErrorCallable copy)
{
super(copy);
this.idxThrowable = copy.idxThrowable;
}
public void call(Object endpoint, Throwable cause)
{
// Throwable is a mandatory parameter
@ -43,18 +48,10 @@ public class OnErrorCallable extends JsrCallable
super.call(endpoint,super.args);
}
public OnErrorCallable copy()
{
OnErrorCallable copy = new OnErrorCallable(pojo,method);
super.copyTo(copy);
copy.idxThrowable = this.idxThrowable;
return copy;
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxThrowable = findIndexForRole(Param.Role.ERROR_CAUSE);
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.jsr356.annotations;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
@ -45,6 +44,14 @@ public class OnMessageBinaryCallable extends OnMessageCallable
super(pojo,method);
}
/**
* Copy Constructor
*/
public OnMessageBinaryCallable(OnMessageCallable copy)
{
super(copy);
}
public void call(Object endpoint, ByteBuffer buf, boolean partialFlag) throws DecodeException
{
super.args[idxMessageObject] = binaryDecoder.decode(buf);
@ -56,12 +63,12 @@ public class OnMessageBinaryCallable extends OnMessageCallable
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxMessageObject = findIndexForRole(Role.MESSAGE_BINARY);
assertRoleRequired(idxMessageObject,"Binary Message Object");
assertDecoderRequired();
binaryDecoder = (Decoder.Binary<?>)getDecoder();
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.jsr356.annotations;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.Map;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
@ -44,6 +43,14 @@ public class OnMessageBinaryStreamCallable extends OnMessageCallable
super(pojo,method);
}
/**
* Copy Constructor
*/
public OnMessageBinaryStreamCallable(OnMessageCallable copy)
{
super(copy);
}
public void call(Object endpoint, InputStream stream) throws DecodeException, IOException
{
super.args[idxMessageObject] = binaryDecoder.decode(stream);
@ -51,12 +58,12 @@ public class OnMessageBinaryStreamCallable extends OnMessageCallable
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxMessageObject = findIndexForRole(Role.MESSAGE_BINARY_STREAM);
assertRoleRequired(idxMessageObject,"Binary InputStream Message Object");
assertDecoderRequired();
binaryDecoder = (Decoder.BinaryStream<?>)getDecoder();
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -25,7 +25,7 @@ import javax.websocket.Decoder;
import org.eclipse.jetty.websocket.common.events.annotated.InvalidSignatureException;
import org.eclipse.jetty.websocket.jsr356.utils.MethodUtils;
public abstract class OnMessageCallable extends JsrCallable
public class OnMessageCallable extends JsrCallable
{
protected int idxPartialMessageFlag = -1;
protected int idxMessageObject = -1;
@ -36,6 +36,14 @@ public abstract class OnMessageCallable extends JsrCallable
super(pojo,method);
}
public OnMessageCallable(OnMessageCallable copy)
{
super(copy);
this.idxPartialMessageFlag = copy.idxPartialMessageFlag;
this.idxMessageObject = copy.idxMessageObject;
this.messageRole = copy.messageRole;
}
protected void assertDecoderRequired()
{
if (getDecoder() == null)
@ -64,6 +72,39 @@ public abstract class OnMessageCallable extends JsrCallable
}
}
private int findMessageObjectIndex()
{
int index = -1;
for (Param.Role role : Param.Role.getMessageRoles())
{
index = findIndexForRole(role);
if (index >= 0)
{
return index;
}
}
return -1;
}
public Param getMessageObjectParam()
{
if (idxMessageObject < 0)
{
idxMessageObject = findMessageObjectIndex();
if (idxMessageObject < 0)
{
StringBuilder err = new StringBuilder();
err.append("A message type must be specified [TEXT, BINARY, DECODER, or PONG]");
throw new InvalidSignatureException(err.toString());
}
}
return super.params[idxMessageObject];
}
public boolean isPartialMessageSupported()
{
return (idxPartialMessageFlag >= 0);

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.jsr356.annotations;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.DecodeException;
import javax.websocket.OnMessage;
@ -40,6 +39,14 @@ public class OnMessagePongCallable extends OnMessageCallable
super(pojo,method);
}
/**
* Copy Constructor
*/
public OnMessagePongCallable(OnMessageCallable copy)
{
super(copy);
}
public void call(Object endpoint, ByteBuffer buf) throws DecodeException
{
super.args[idxMessageObject] = new JsrPongMessage(buf);
@ -47,10 +54,10 @@ public class OnMessagePongCallable extends OnMessageCallable
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxMessageObject = findIndexForRole(Role.MESSAGE_PONG);
assertRoleRequired(idxMessageObject,"Pong Message Object");
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.jsr356.annotations;
import java.io.Reader;
import java.lang.reflect.Method;
import java.util.Map;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
@ -45,6 +44,14 @@ public class OnMessageTextCallable extends OnMessageCallable
super(pojo,method);
}
/**
* Copy Constructor
*/
public OnMessageTextCallable(OnMessageCallable copy)
{
super(copy);
}
public void call(Object endpoint, String str, boolean partialFlag) throws DecodeException
{
super.args[idxMessageObject] = textDecoder.decode(str);
@ -56,12 +63,12 @@ public class OnMessageTextCallable extends OnMessageCallable
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxMessageObject = findIndexForRole(Role.MESSAGE_TEXT);
assertRoleRequired(idxMessageObject,"Text Message Object");
assertDecoderRequired();
textDecoder = (Decoder.Text<?>)getDecoder();
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.jsr356.annotations;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Method;
import java.util.Map;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
@ -44,6 +43,14 @@ public class OnMessageTextStreamCallable extends OnMessageCallable
super(pojo,method);
}
/**
* Copy Constructor
*/
public OnMessageTextStreamCallable(OnMessageCallable copy)
{
super(copy);
}
public void call(Object endpoint, Reader reader) throws DecodeException, IOException
{
super.args[idxMessageObject] = textDecoder.decode(reader);
@ -51,12 +58,12 @@ public class OnMessageTextStreamCallable extends OnMessageCallable
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxMessageObject = findIndexForRole(Role.MESSAGE_TEXT_STREAM);
assertRoleRequired(idxMessageObject,"Text Reader Message Object");
assertDecoderRequired();
textDecoder = (Decoder.TextStream<?>)getDecoder();
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.jsr356.annotations;
import java.lang.reflect.Method;
import java.util.Map;
import javax.websocket.EndpointConfig;
import javax.websocket.OnOpen;
@ -39,6 +38,12 @@ public class OnOpenCallable extends JsrCallable
super(pojo,method);
}
public OnOpenCallable(OnOpenCallable copy)
{
super(copy);
this.idxEndpointConfig = copy.idxEndpointConfig;
}
public void call(Object endpoint, EndpointConfig config)
{
// EndpointConfig is an optional parameter
@ -49,18 +54,10 @@ public class OnOpenCallable extends JsrCallable
super.call(endpoint,super.args);
}
public OnOpenCallable copy()
{
OnOpenCallable copy = new OnOpenCallable(pojo,method);
super.copyTo(copy);
copy.idxEndpointConfig = this.idxEndpointConfig;
return copy;
}
@Override
public void init(Session session, Map<String, String> pathParams)
public void init(Session session)
{
idxEndpointConfig = findIndexForRole(Role.ENDPOINT_CONFIG);
super.init(session,pathParams);
super.init(session);
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.jsr356.annotations;
import org.eclipse.jetty.websocket.jsr356.utils.MethodUtils;
public class Param
{
/**
@ -35,7 +37,20 @@ public class Param
MESSAGE_BINARY_STREAM,
MESSAGE_PONG,
MESSAGE_PARTIAL_FLAG,
PATH_PARAM
PATH_PARAM;
private static Role[] messageRoles;
static
{
messageRoles = new Role[]
{ MESSAGE_TEXT, MESSAGE_TEXT_STREAM, MESSAGE_BINARY, MESSAGE_BINARY_STREAM, MESSAGE_PONG, };
}
public static Role[] getMessageRoles()
{
return messageRoles;
}
}
public int index;
@ -72,6 +87,22 @@ public class Param
this.pathParamName = name;
}
@Override
public String toString()
{
StringBuilder str = new StringBuilder();
str.append("Param[");
str.append("index=").append(index);
str.append(",type=").append(MethodUtils.toString(type));
str.append(",role=").append(role);
if (pathParamName != null)
{
str.append(",pathParamName=").append(pathParamName);
}
str.append(']');
return str.toString();
}
public void unbind()
{
this.role = null;

View File

@ -0,0 +1,29 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.endpoints;
import javax.websocket.Session;
/**
* Used to tag and expose JSR based EventDriver's that expose the JSR {@link Session}
*/
public interface IJsrSession
{
public Session getJsrSession();
}

View File

@ -25,11 +25,12 @@ import java.nio.ByteBuffer;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.Decoder.Text;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
@ -43,21 +44,29 @@ import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
import org.eclipse.jetty.websocket.common.message.SimpleTextMessage;
import org.eclipse.jetty.websocket.jsr356.JettyWebSocketContainer;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implements EventDriver
public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implements EventDriver, IJsrSession
{
private static final Logger LOG = Log.getLogger(JsrClientAnnotatedEventDriver.class);
private final JettyWebSocketContainer container;
private final JsrClientMetadata events;
private final JsrEvents events;
private boolean hasCloseBeenCalled = false;
private JsrSession jsrsession;
private ClientEndpointConfig endpointconfig;
private MessageAppender activeMessage;
public JsrClientAnnotatedEventDriver(JettyWebSocketContainer container, WebSocketPolicy policy, Object websocket, JsrClientMetadata metadata)
public JsrClientAnnotatedEventDriver(JettyWebSocketContainer container, WebSocketPolicy policy, Object websocket, JsrEvents events)
{
super(policy,websocket);
this.container = container;
this.events = metadata;
this.events = events;
}
@Override
public Session getJsrSession()
{
return this.jsrsession;
}
/**
@ -66,17 +75,24 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
@Override
public void onBinaryFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("onBinaryFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
LOG.debug("events.onBinary={}",events.hasBinary());
LOG.debug("events.onBinaryStream={}",events.hasBinaryStream());
}
boolean handled = false;
if (events.onBinary != null)
if (events.hasBinary())
{
handled = true;
if (events.onBinary.isPartialMessageSupported())
if (events.isBinaryPartialSupported())
{
LOG.debug("Partial Binary Message: fin={}",fin);
// Partial Message Support (does not use messageAppender)
try
{
events.onBinary.call(websocket,buffer,fin);
events.callBinary(websocket,buffer,fin);
}
catch (DecodeException e)
{
@ -89,28 +105,34 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
// Whole Message Support
if (activeMessage == null)
{
LOG.debug("Whole Binary Message");
activeMessage = new SimpleBinaryMessage(this);
}
}
}
if (events.onBinaryStream != null)
if (events.hasBinaryStream())
{
handled = true;
// Streaming Message Support
if (activeMessage == null)
{
LOG.debug("Binary Message InputStream");
activeMessage = new MessageInputStream(this);
}
}
LOG.debug("handled = {}",handled);
// Process any active MessageAppender
if (handled && (activeMessage != null))
{
LOG.debug("Appending Binary Message");
activeMessage.appendMessage(buffer);
if (fin)
{
LOG.debug("Binary Message Complete");
activeMessage.messageComplete();
activeMessage = null;
}
@ -123,15 +145,15 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
@Override
public void onBinaryMessage(byte[] data)
{
if (events.onBinary == null)
if (LOG.isDebugEnabled())
{
// not interested in text events
return;
LOG.debug("onBinary({})",data);
}
try
{
events.onBinary.call(websocket,ByteBuffer.wrap(data),false);
// FIN is always true here
events.callBinary(websocket,ByteBuffer.wrap(data),true);
}
catch (DecodeException e)
{
@ -148,28 +170,19 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
return;
}
hasCloseBeenCalled = true;
if (events.onClose != null)
{
events.onClose.call(websocket,close);
}
events.callClose(websocket,close);
}
@Override
public void onConnect()
{
if (events.onOpen != null)
{
events.onOpen.call(websocket,endpointconfig);
}
events.callOpen(websocket,endpointconfig);
}
@Override
public void onError(Throwable cause)
{
if (events.onError != null)
{
events.onError.call(websocket,cause);
}
events.callError(websocket,cause);
}
private void onFatalError(Throwable t)
@ -187,15 +200,9 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
@Override
public void onInputStream(InputStream stream)
{
if (events.onBinaryStream == null)
{
// not interested in text events
return;
}
try
{
events.onBinaryStream.call(websocket,stream);
events.callBinaryStream(websocket,stream);
}
catch (DecodeException | IOException e)
{
@ -206,15 +213,9 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
@Override
public void onReader(Reader reader)
{
if (events.onTextStream == null)
{
// not interested in text events
return;
}
try
{
events.onTextStream.call(websocket,reader);
events.callTextStream(websocket,reader);
}
catch (DecodeException | IOException e)
{
@ -228,18 +229,26 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
@Override
public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("onTextFrame({}, {})",BufferUtil.toDetailString(buffer),fin);
LOG.debug("events.hasText={}",events.hasText());
LOG.debug("events.hasTextStream={}",events.hasTextStream());
}
boolean handled = false;
if (events.onText != null)
if (events.hasText())
{
handled = true;
if (events.onText.isPartialMessageSupported())
if (events.isTextPartialSupported())
{
LOG.debug("Partial Text Message: fin={}",fin);
// Partial Message Support (does not use messageAppender)
try
{
String text = BufferUtil.toUTF8String(buffer);
events.onText.call(websocket,text,fin);
events.callText(websocket,text,fin);
}
catch (DecodeException e)
{
@ -252,28 +261,34 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
// Whole Message Support
if (activeMessage == null)
{
LOG.debug("Whole Text Message");
activeMessage = new SimpleTextMessage(this);
}
}
}
if (events.onTextStream != null)
if (events.hasTextStream())
{
handled = true;
// Streaming Message Support
if (activeMessage == null)
{
LOG.debug("Text Message Writer");
activeMessage = new MessageReader(this);
}
}
LOG.debug("handled = {}",handled);
// Process any active MessageAppender
if (handled && (activeMessage != null))
{
LOG.debug("Appending Text Message");
activeMessage.appendMessage(buffer);
if (fin)
{
LOG.debug("Text Message Complete");
activeMessage.messageComplete();
activeMessage = null;
}
@ -286,26 +301,17 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
@Override
public void onTextMessage(String message)
{
if (events.onText == null)
{
// not interested in text events
return;
}
LOG.debug("onText({})",message);
Decoder.Text<?> decoder = (Text<?>)events.onText.getDecoder();
try
{
decoder.init(endpointconfig);
events.onText.call(websocket,jsrsession,decoder.decode(message));
// FIN is always true here
events.callText(websocket,message,true);
}
catch (DecodeException e)
{
onFatalError(e);
}
finally
{
decoder.destroy();
}
}
@Override
@ -314,5 +320,12 @@ public class JsrClientAnnotatedEventDriver extends AbstractEventDriver implement
super.openSession(session);
String id = container.getNextId();
this.jsrsession = new JsrSession(container,session,id);
this.events.init(jsrsession);
}
@Override
public String toString()
{
return String.format("%s[websocket=%s]",this.getClass().getSimpleName(),websocket);
}
}

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverImpl;
import org.eclipse.jetty.websocket.jsr356.JettyWebSocketContainer;
import org.eclipse.jetty.websocket.jsr356.annotations.AnnotatedEndpointScanner;
import org.eclipse.jetty.websocket.jsr356.annotations.JsrEvents;
public class JsrClientEndpointImpl implements EventDriverImpl
{
@ -52,22 +53,21 @@ public class JsrClientEndpointImpl implements EventDriverImpl
}
Class<?> endpointClass = endpoint.getClass();
JsrClientMetadata metadata = cache.get(endpointClass);
if (metadata == null)
// Get the base metadata for this class
JsrClientMetadata basemetadata = cache.get(endpointClass);
if (basemetadata == null)
{
metadata = new JsrClientMetadata(endpointClass);
AnnotatedEndpointScanner scanner = new AnnotatedEndpointScanner(metadata);
basemetadata = new JsrClientMetadata(endpointClass);
AnnotatedEndpointScanner scanner = new AnnotatedEndpointScanner(basemetadata);
scanner.scan();
cache.put(endpointClass,metadata);
cache.put(endpointClass,basemetadata);
}
// The potential decoders
if (config != null)
{
}
return new JsrClientAnnotatedEventDriver(container,policy,endpoint,metadata);
// At this point we have a base metadata, now we need to copy it for
// this specific instance of the WebSocket Endpoint (as we will be
// modifying the metadata)
JsrEvents events = new JsrEvents(basemetadata);
return new JsrClientAnnotatedEventDriver(container,policy,endpoint,events);
}
@Override
@ -79,7 +79,16 @@ public class JsrClientEndpointImpl implements EventDriverImpl
@Override
public boolean supports(Object websocket)
{
ClientEndpoint anno = websocket.getClass().getAnnotation(ClientEndpoint.class);
Object endpoint = websocket;
if (endpoint instanceof ConfiguredEndpoint)
{
// unwrap
ConfiguredEndpoint ce = (ConfiguredEndpoint)websocket;
endpoint = ce.getEndpoint();
}
ClientEndpoint anno = endpoint.getClass().getAnnotation(ClientEndpoint.class);
return (anno != null);
}
}

View File

@ -23,6 +23,8 @@ import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -30,8 +32,15 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
public class JsrEndpointEventDriver implements EventDriver
public class JsrEndpointEventDriver implements EventDriver, IJsrSession
{
@Override
public Session getJsrSession()
{
// TODO Auto-generated method stub
return null;
}
@Override
public WebSocketPolicy getPolicy()
{

View File

@ -27,9 +27,11 @@ import org.eclipse.jetty.websocket.jsr356.JettyWebSocketContainer;
public class JsrEndpointImpl implements EventDriverImpl
{
public JsrEndpointImpl(JettyWebSocketContainer jettyWebSocketContainer)
private final JettyWebSocketContainer container;
public JsrEndpointImpl(JettyWebSocketContainer container)
{
// TODO Auto-generated constructor stub
this.container = container;
}
@Override
@ -42,8 +44,7 @@ public class JsrEndpointImpl implements EventDriverImpl
@Override
public String describeRule()
{
// TODO Auto-generated method stub
return null;
return "class extends " + Endpoint.class.getName();
}
@Override

View File

@ -0,0 +1,50 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.endpoints;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.jsr356.JettyWebSocketContainer;
public class JsrEventDriverFactory extends EventDriverFactory
{
public JsrEventDriverFactory(WebSocketPolicy policy, JettyWebSocketContainer container)
{
super(policy);
clearImplementations();
addImplementation(new JsrEndpointImpl(container));
addImplementation(new JsrClientEndpointImpl(container));
}
/**
* Unwrap ConfiguredEndpoint for end-user.
*/
@Override
protected String getClassName(Object websocket)
{
if (websocket instanceof ConfiguredEndpoint)
{
ConfiguredEndpoint ce = (ConfiguredEndpoint)websocket;
return ce.getEndpoint().getClass().getName();
}
return websocket.getClass().getName();
}
}

View File

@ -105,4 +105,11 @@ public final class MethodUtils
// TODO: show exceptions?
return str.toString();
}
public static String toString(Type type)
{
StringBuilder str = new StringBuilder();
appendTypeName(str,type,true);
return str.toString();
}
}

View File

@ -63,7 +63,7 @@ public class WebSocketClient extends ContainerLifeCycle
private final WebSocketPolicy policy;
private final SslContextFactory sslContextFactory;
private final WebSocketExtensionFactory extensionRegistry;
private final EventDriverFactory eventDriverFactory;
private EventDriverFactory eventDriverFactory;
private ByteBufferPool bufferPool;
private Executor executor;
private Scheduler scheduler;
@ -139,7 +139,17 @@ public class WebSocketClient extends ContainerLifeCycle
ConnectionManager manager = getConnectionManager();
// Setup Driver for user provided websocket
EventDriver driver = eventDriverFactory.wrap(websocket);
EventDriver driver = null;
if (websocket instanceof EventDriver)
{
// Use the EventDriver as-is
driver = (EventDriver)websocket;
}
else
{
// Wrap websocket with appropriate EventDriver
driver = eventDriverFactory.wrap(websocket);
}
if (driver == null)
{
@ -405,6 +415,11 @@ public class WebSocketClient extends ContainerLifeCycle
this.cookieStore = cookieStore;
}
public void setEventDriverFactory(EventDriverFactory factory)
{
this.eventDriverFactory = factory;
}
public void setExecutor(Executor executor)
{
this.executor = executor;

View File

@ -57,6 +57,16 @@ public class EventDriverFactory
implementations.add(impl);
}
public void clearImplementations()
{
this.implementations.clear();
}
protected String getClassName(Object websocket)
{
return websocket.getClass().getName();
}
public List<EventDriverImpl> getImplementations()
{
return implementations;
@ -67,8 +77,6 @@ public class EventDriverFactory
return this.implementations.remove(impl);
}
@Override
public String toString()
{
@ -113,7 +121,7 @@ public class EventDriverFactory
// Create a clear error message for the developer
StringBuilder err = new StringBuilder();
err.append(websocket.getClass().getName());
err.append(getClassName(websocket));
err.append(" is not a valid WebSocket object.");
err.append(" Object must obey one of the following rules: ");
@ -125,7 +133,7 @@ public class EventDriverFactory
{
err.append(" or ");
}
err.append('(').append(i + 1).append(") ");
err.append("\n(").append(i + 1).append(") ");
err.append(impl.describeRule());
}

View File

@ -21,29 +21,37 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.OutputStream;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
public class MessageOutputStream extends OutputStream
{
private final LogicalConnection connection;
private final OutgoingFrames outgoing;
private final WebSocketSession session;
private final int bufferSize;
public MessageOutputStream(LogicalConnection connection, OutgoingFrames outgoing)
public MessageOutputStream(WebSocketSession session)
{
this.connection = connection;
this.outgoing = outgoing;
this.session = session;
this.bufferSize = session.getPolicy().getMaxBinaryMessageBufferSize();
}
public boolean isClosed()
@Override
public void close() throws IOException
{
// TODO Auto-generated method stub
return false;
// TODO finish sending whatever in the buffer with FIN=true
// TODO or just send an empty buffer with FIN=true
super.close();
}
@Override
public void flush() throws IOException
{
// TODO flush whatever is in the buffer with FIN=false
super.flush();
}
@Override
public void write(int b) throws IOException
{
// TODO Auto-generated method stub
// TODO buffer up to limit, flush once buffer reached.
}
}

View File

@ -21,45 +21,35 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.Writer;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
public class MessageWriter extends Writer
{
private final LogicalConnection connection;
private final OutgoingFrames outgoing;
private final WebSocketSession session;
private final int bufferSize;
public MessageWriter(LogicalConnection connection, OutgoingFrames outgoing)
public MessageWriter(WebSocketSession session)
{
this.connection = connection;
this.outgoing = outgoing;
this.session = session;
this.bufferSize = session.getPolicy().getMaxTextMessageBufferSize();
}
@Override
public void close() throws IOException
{
// TODO Auto-generated method stub
// TODO finish sending whatever in the buffer with FIN=true
// TODO or just send an empty buffer with FIN=true
}
@Override
public void flush() throws IOException
{
// TODO Auto-generated method stub
}
public boolean isClosed()
{
// TODO Auto-generated method stub
return false;
// TODO flush whatever is in the buffer with FIN=false
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException
{
// TODO Auto-generated method stub
// TODO buffer up to limit, flush once buffer reached.
}
}