This closes #78 Stomp changes

This commit is contained in:
Clebert Suconic 2015-07-14 09:53:36 -04:00
commit 4167c5f2cc
10 changed files with 65 additions and 52 deletions

View File

@ -128,8 +128,9 @@ public class ActiveMQStompException extends Exception
code = newCode;
}
public void setHandler(VersionedStompFrameHandler frameHandler)
public ActiveMQStompException setHandler(VersionedStompFrameHandler frameHandler)
{
this.handler = frameHandler;
return this;
}
}

View File

@ -249,7 +249,7 @@ public final class StompConnection implements RemotingConnection
if (!manager.destinationExists(destination))
{
throw BUNDLE.destinationNotExist(destination);
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
@ -266,7 +266,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw new ActiveMQStompException(e.getMessage(), e);
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}
autoCreated = true;
}
@ -490,7 +490,7 @@ public final class StompConnection implements RemotingConnection
else
{
//not a supported version!
ActiveMQStompException error = BUNDLE.versionNotSupported(acceptVersion);
ActiveMQStompException error = BUNDLE.versionNotSupported(acceptVersion).setHandler(frameHandler);
error.addHeader(Stomp.Headers.Error.VERSION, manager.getSupportedVersionsAsErrorVersion());
error.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
error.setBody("Supported protocol versions are " + manager.getSupportedVersionsAsString());
@ -514,7 +514,7 @@ public final class StompConnection implements RemotingConnection
{
if (host == null)
{
ActiveMQStompException error = BUNDLE.nullHostHeader();
ActiveMQStompException error = BUNDLE.nullHostHeader().setHandler(frameHandler);
error.setBody(BUNDLE.hostCannotBeNull());
throw error;
}
@ -522,7 +522,7 @@ public final class StompConnection implements RemotingConnection
String localHost = manager.getVirtualHostName();
if (!host.equals(localHost))
{
ActiveMQStompException error = BUNDLE.hostNotMatch();
ActiveMQStompException error = BUNDLE.hostNotMatch().setHandler(frameHandler);
error.setBody(BUNDLE.hostNotMatchDetails(host));
throw error;
}
@ -542,13 +542,13 @@ public final class StompConnection implements RemotingConnection
{
if (isDestroyed())
{
throw BUNDLE.connectionDestroyed();
throw BUNDLE.connectionDestroyed().setHandler(frameHandler);
}
if (!initialized)
{
if (!(Stomp.Commands.CONNECT.equals(cmd) || Stomp.Commands.STOMP.equals(cmd)))
{
throw BUNDLE.connectionNotEstablished();
throw BUNDLE.connectionNotEstablished().setHandler(frameHandler);
}
//decide version
negotiateVersion(request);
@ -609,7 +609,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorGetSession(e);
throw BUNDLE.errorGetSession(e).setHandler(frameHandler);
}
return session;
@ -619,7 +619,7 @@ public final class StompConnection implements RemotingConnection
{
if (!this.valid)
{
throw BUNDLE.invalidConnection();
throw BUNDLE.invalidConnection().setHandler(frameHandler);
}
}
@ -649,7 +649,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorSendMessage(message, e);
throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler);
}
}
@ -677,7 +677,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorBeginTx(txID, e);
throw BUNDLE.errorBeginTx(txID, e).setHandler(frameHandler);
}
}
@ -689,7 +689,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorCommitTx(txID, e);
throw BUNDLE.errorCommitTx(txID, e).setHandler(frameHandler);
}
}
@ -705,7 +705,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorAbortTx(txID, e);
throw BUNDLE.errorAbortTx(txID, e).setHandler(frameHandler);
}
}
@ -740,7 +740,7 @@ public final class StompConnection implements RemotingConnection
{
if (destination == null)
{
throw BUNDLE.noDestination();
throw BUNDLE.noDestination().setHandler(frameHandler);
}
subscriptionID = "subscription/" + destination;
}
@ -755,7 +755,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorCreatSubscription(subscriptionID, e);
throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler);
}
}
@ -771,7 +771,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorUnsubscrib(subscriptionID, e);
throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler);
}
}
@ -787,7 +787,7 @@ public final class StompConnection implements RemotingConnection
}
catch (Exception e)
{
throw BUNDLE.errorAck(messageID, e);
throw BUNDLE.errorAck(messageID, e).setHandler(frameHandler);
}
}

View File

@ -162,8 +162,11 @@ public class StompDecoder
//max len of EOL (default is 1 for '\n')
protected int eolLen = 1;
public StompDecoder()
protected final VersionedStompFrameHandler handler;
public StompDecoder(VersionedStompFrameHandler handler)
{
this.handler = handler;
}
public boolean hasBytes()
@ -276,7 +279,9 @@ public class StompDecoder
// reset
StompFrame ret = new StompFrame(command, headers, content);
StompFrame ret = handler.createStompFrame(command);
ret.headers = headers;
ret.setByteBody(content);
init();
@ -412,7 +417,7 @@ public class StompDecoder
}
else if (workingBuffer[offset] == CR)
{
if (nextChar) throw BUNDLE.invalidTwoCRs();
if (nextChar) throw BUNDLE.invalidTwoCRs().setHandler(handler);
nextChar = true;
}
else
@ -424,7 +429,7 @@ public class StompDecoder
if (nextChar)
{
throw BUNDLE.badCRs();
throw BUNDLE.badCRs().setHandler(handler);
}
if (data < 4 + offset)
@ -616,7 +621,7 @@ public class StompDecoder
if (workingBuffer[pos - 1] != NEW_LINE)
{
//give a signal to try other versions
ActiveMQStompException error = BUNDLE.notValidNewLine(workingBuffer[pos - 1]);
ActiveMQStompException error = BUNDLE.notValidNewLine(workingBuffer[pos - 1]).setHandler(handler);
error.setCode(ActiveMQStompException.INVALID_EOL_V10);
error.setBody(BUNDLE.unexpectedNewLine(workingBuffer[pos - 1]));
throw error;
@ -627,7 +632,7 @@ public class StompDecoder
public void throwInvalid() throws ActiveMQStompException
{
ActiveMQStompException error = BUNDLE.invalidCommand(this.dumpByteArray(workingBuffer));
ActiveMQStompException error = BUNDLE.invalidCommand(this.dumpByteArray(workingBuffer)).setHandler(handler);
error.setCode(ActiveMQStompException.INVALID_COMMAND);
error.setBody(BUNDLE.invalidFrame(this.dumpByteArray(workingBuffer)));
throw error;

View File

@ -33,7 +33,7 @@ public class StompFrame
protected final String command;
protected final Map<String, String> headers;
protected Map<String, String> headers;
private String body;

View File

@ -364,7 +364,8 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
ActiveMQStompException e = new ActiveMQStompException("Error sending reply",
ActiveMQExceptionType.createException(errorCode, errorMessage));
ActiveMQExceptionType.createException(errorCode, errorMessage))
.setHandler(connection.getFrameHandler());
StompFrame error = e.getFrame();
send(connection, error);
@ -426,7 +427,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
StompSession session = getTransactedSession(connection, txID);
if (session == null)
{
throw new ActiveMQStompException("No transaction started: " + txID);
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
transactedSessions.remove(txID);
session.getSession().commit();
@ -437,7 +438,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
StompSession session = getTransactedSession(connection, txID);
if (session == null)
{
throw new ActiveMQStompException("No transaction started: " + txID);
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
transactedSessions.remove(txID);
session.getSession().rollback(false);
@ -452,7 +453,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID))
{
throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID +
throw new ActiveMQStompException(connection, "There already is a subscription for: " + subscriptionID +
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
@ -473,7 +474,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName);
if (!unsubscribed)
{
throw new ActiveMQStompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
throw new ActiveMQStompException(connection, "Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
}
}

View File

@ -223,7 +223,7 @@ public class StompSession implements SessionCallback
if (pair == null)
{
throw BUNDLE.failToAckMissingID(id);
throw BUNDLE.failToAckMissingID(id).setHandler(connection.getFrameHandler());
}
long consumerID = pair.getA();
@ -235,7 +235,7 @@ public class StompSession implements SessionCallback
{
if (!sub.getID().equals(subscriptionID))
{
throw BUNDLE.subscriptionIDMismatch(subscriptionID, sub.getID());
throw BUNDLE.subscriptionIDMismatch(subscriptionID, sub.getID()).setHandler(connection.getFrameHandler());
}
}

View File

@ -158,7 +158,7 @@ public abstract class VersionedStompFrameHandler
public StompFrame onUnknown(String command)
{
ActiveMQStompException error = BUNDLE.unknownCommand(command);
ActiveMQStompException error = BUNDLE.unknownCommand(command).setHandler(this);
StompFrame response = error.getFrame();
return response;
}
@ -178,7 +178,7 @@ public abstract class VersionedStompFrameHandler
String txID = request.getHeader(Stomp.Headers.TRANSACTION);
if (txID == null)
{
ActiveMQStompException error = BUNDLE.needTxIDHeader();
ActiveMQStompException error = BUNDLE.needTxIDHeader().setHandler(this);
response = error.getFrame();
return response;
}
@ -230,7 +230,7 @@ public abstract class VersionedStompFrameHandler
}
catch (Exception e)
{
ActiveMQStompException error = BUNDLE.errorHandleSend(e);
ActiveMQStompException error = BUNDLE.errorHandleSend(e).setHandler(this);
response = error.getFrame();
}
@ -248,7 +248,7 @@ public abstract class VersionedStompFrameHandler
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
if (txID == null)
{
ActiveMQStompException error = BUNDLE.beginTxNoID();
ActiveMQStompException error = BUNDLE.beginTxNoID().setHandler(this);
response = error.getFrame();
}
else
@ -272,7 +272,7 @@ public abstract class VersionedStompFrameHandler
if (txID == null)
{
ActiveMQStompException error = BUNDLE.abortTxNoID();
ActiveMQStompException error = BUNDLE.abortTxNoID().setHandler(this);
response = error.getFrame();
return response;
}
@ -391,7 +391,7 @@ public abstract class VersionedStompFrameHandler
/**
* this method is called when a newer version of handler is created. It should
* take over the state of the decoder of the existingHandler so that
* the decoding can be continued. For V10 handler it's never get called.
* the decoding can be continued. For V10 handler it's never called.
*
* @param existingHandler
*/

View File

@ -35,7 +35,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
public StompFrameHandlerV10(StompConnection connection)
{
super(connection);
decoder = new StompDecoder();
decoder = new StompDecoder(this);
decoder.init();
connection.addStompEventListener(this);
}
@ -102,7 +102,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
{
if (destination == null)
{
ActiveMQStompException error = BUNDLE.needIDorDestination();
ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
response = error.getFrame();
return response;
}

View File

@ -41,7 +41,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
{
super(connection);
connection.addStompEventListener(this);
decoder = new StompDecoderV11();
decoder = new StompDecoderV11(this);
decoder.init();
}
@ -124,7 +124,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
String[] params = heartBeatHeader.split(",");
if (params.length != 2)
{
throw new ActiveMQStompException("Incorrect heartbeat header " + heartBeatHeader);
throw new ActiveMQStompException(connection, "Incorrect heartbeat header " + heartBeatHeader);
}
//client ping
@ -171,7 +171,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
}
else
{
response = BUNDLE.needSubscriptionID().getFrame();
response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
return response;
}
@ -202,7 +202,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
if (subscriptionID == null)
{
response = BUNDLE.needSubscriptionID().getFrame();
response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
return response;
}
@ -428,6 +428,11 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
protected boolean isEscaping = false;
protected SimpleBytes holder = new SimpleBytes(1024);
public StompDecoderV11(StompFrameHandlerV11 handler)
{
super(handler);
}
@Override
public void init(StompDecoder decoder)
{
@ -470,7 +475,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
}
else if (workingBuffer[offset] == CR)
{
if (nextChar) throw BUNDLE.invalidTwoCRs();
if (nextChar) throw BUNDLE.invalidTwoCRs().setHandler(handler);
nextChar = true;
}
else
@ -483,7 +488,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
if (nextChar)
{
throw BUNDLE.badCRs();
throw BUNDLE.badCRs().setHandler(handler);
}
//if some EOLs have been processed, drop those bytes before parsing command

View File

@ -35,7 +35,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE
public StompFrameHandlerV12(StompConnection connection)
{
super(connection);
decoder = new StompDecoderV12();
decoder = new StompDecoderV12(this);
decoder.init();
}
@ -78,8 +78,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE
if (messageID == null)
{
ActiveMQStompException error = BUNDLE.noIDInAck();
error.setHandler(connection.getFrameHandler());
ActiveMQStompException error = BUNDLE.noIDInAck().setHandler(connection.getFrameHandler());
return error.getFrame();
}
@ -99,9 +98,11 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE
{
protected boolean nextEOLChar = false;
public StompDecoderV12()
public StompDecoderV12(StompFrameHandlerV12 handler)
{
//1.2 allow '\r\n'
super(handler);
//1.2 allows '\r\n'
eolLen = 2;
}
@ -212,7 +213,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE
{
if (nextEOLChar)
{
throw BUNDLE.invalidTwoCRs();
throw BUNDLE.invalidTwoCRs().setHandler(handler);
}
nextEOLChar = true;
break;