Set frame handlers to ActiveMQStompExceptions

This commit is contained in:
Ville Skyttä 2015-07-14 12:55:41 +03:00 committed by Clebert Suconic
parent 3deb20f049
commit 6ee22e7c5e
9 changed files with 46 additions and 45 deletions

View File

@ -128,8 +128,9 @@ public class ActiveMQStompException extends Exception
code = newCode; code = newCode;
} }
public void setHandler(VersionedStompFrameHandler frameHandler) public ActiveMQStompException setHandler(VersionedStompFrameHandler frameHandler)
{ {
this.handler = frameHandler; this.handler = frameHandler;
return this;
} }
} }

View File

@ -249,7 +249,7 @@ public final class StompConnection implements RemotingConnection
if (!manager.destinationExists(destination)) if (!manager.destinationExists(destination))
{ {
throw BUNDLE.destinationNotExist(destination); throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
} }
} }
@ -266,7 +266,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw new ActiveMQStompException(e.getMessage(), e); throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
} }
autoCreated = true; autoCreated = true;
} }
@ -490,7 +490,7 @@ public final class StompConnection implements RemotingConnection
else else
{ {
//not a supported version! //not a supported version!
ActiveMQStompException error = BUNDLE.versionNotSupported(acceptVersion); ActiveMQStompException error = BUNDLE.versionNotSupported(acceptVersion).setHandler(frameHandler);
error.addHeader(Stomp.Headers.Error.VERSION, manager.getSupportedVersionsAsErrorVersion()); error.addHeader(Stomp.Headers.Error.VERSION, manager.getSupportedVersionsAsErrorVersion());
error.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain"); error.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
error.setBody("Supported protocol versions are " + manager.getSupportedVersionsAsString()); error.setBody("Supported protocol versions are " + manager.getSupportedVersionsAsString());
@ -514,7 +514,7 @@ public final class StompConnection implements RemotingConnection
{ {
if (host == null) if (host == null)
{ {
ActiveMQStompException error = BUNDLE.nullHostHeader(); ActiveMQStompException error = BUNDLE.nullHostHeader().setHandler(frameHandler);
error.setBody(BUNDLE.hostCannotBeNull()); error.setBody(BUNDLE.hostCannotBeNull());
throw error; throw error;
} }
@ -522,7 +522,7 @@ public final class StompConnection implements RemotingConnection
String localHost = manager.getVirtualHostName(); String localHost = manager.getVirtualHostName();
if (!host.equals(localHost)) if (!host.equals(localHost))
{ {
ActiveMQStompException error = BUNDLE.hostNotMatch(); ActiveMQStompException error = BUNDLE.hostNotMatch().setHandler(frameHandler);
error.setBody(BUNDLE.hostNotMatchDetails(host)); error.setBody(BUNDLE.hostNotMatchDetails(host));
throw error; throw error;
} }
@ -542,13 +542,13 @@ public final class StompConnection implements RemotingConnection
{ {
if (isDestroyed()) if (isDestroyed())
{ {
throw BUNDLE.connectionDestroyed(); throw BUNDLE.connectionDestroyed().setHandler(frameHandler);
} }
if (!initialized) if (!initialized)
{ {
if (!(Stomp.Commands.CONNECT.equals(cmd) || Stomp.Commands.STOMP.equals(cmd))) if (!(Stomp.Commands.CONNECT.equals(cmd) || Stomp.Commands.STOMP.equals(cmd)))
{ {
throw BUNDLE.connectionNotEstablished(); throw BUNDLE.connectionNotEstablished().setHandler(frameHandler);
} }
//decide version //decide version
negotiateVersion(request); negotiateVersion(request);
@ -609,7 +609,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorGetSession(e); throw BUNDLE.errorGetSession(e).setHandler(frameHandler);
} }
return session; return session;
@ -619,7 +619,7 @@ public final class StompConnection implements RemotingConnection
{ {
if (!this.valid) if (!this.valid)
{ {
throw BUNDLE.invalidConnection(); throw BUNDLE.invalidConnection().setHandler(frameHandler);
} }
} }
@ -649,7 +649,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorSendMessage(message, e); throw BUNDLE.errorSendMessage(message, e).setHandler(frameHandler);
} }
} }
@ -677,7 +677,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorBeginTx(txID, e); throw BUNDLE.errorBeginTx(txID, e).setHandler(frameHandler);
} }
} }
@ -689,7 +689,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorCommitTx(txID, e); throw BUNDLE.errorCommitTx(txID, e).setHandler(frameHandler);
} }
} }
@ -705,7 +705,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorAbortTx(txID, e); throw BUNDLE.errorAbortTx(txID, e).setHandler(frameHandler);
} }
} }
@ -740,7 +740,7 @@ public final class StompConnection implements RemotingConnection
{ {
if (destination == null) if (destination == null)
{ {
throw BUNDLE.noDestination(); throw BUNDLE.noDestination().setHandler(frameHandler);
} }
subscriptionID = "subscription/" + destination; subscriptionID = "subscription/" + destination;
} }
@ -755,7 +755,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorCreatSubscription(subscriptionID, e); throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler);
} }
} }
@ -771,7 +771,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorUnsubscrib(subscriptionID, e); throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler);
} }
} }
@ -787,7 +787,7 @@ public final class StompConnection implements RemotingConnection
} }
catch (Exception e) catch (Exception e)
{ {
throw BUNDLE.errorAck(messageID, e); throw BUNDLE.errorAck(messageID, e).setHandler(frameHandler);
} }
} }

View File

@ -162,7 +162,7 @@ public class StompDecoder
//max len of EOL (default is 1 for '\n') //max len of EOL (default is 1 for '\n')
protected int eolLen = 1; protected int eolLen = 1;
private final VersionedStompFrameHandler handler; protected final VersionedStompFrameHandler handler;
public StompDecoder(VersionedStompFrameHandler handler) public StompDecoder(VersionedStompFrameHandler handler)
{ {
@ -417,7 +417,7 @@ public class StompDecoder
} }
else if (workingBuffer[offset] == CR) else if (workingBuffer[offset] == CR)
{ {
if (nextChar) throw BUNDLE.invalidTwoCRs(); if (nextChar) throw BUNDLE.invalidTwoCRs().setHandler(handler);
nextChar = true; nextChar = true;
} }
else else
@ -429,7 +429,7 @@ public class StompDecoder
if (nextChar) if (nextChar)
{ {
throw BUNDLE.badCRs(); throw BUNDLE.badCRs().setHandler(handler);
} }
if (data < 4 + offset) if (data < 4 + offset)
@ -621,7 +621,7 @@ public class StompDecoder
if (workingBuffer[pos - 1] != NEW_LINE) if (workingBuffer[pos - 1] != NEW_LINE)
{ {
//give a signal to try other versions //give a signal to try other versions
ActiveMQStompException error = BUNDLE.notValidNewLine(workingBuffer[pos - 1]); ActiveMQStompException error = BUNDLE.notValidNewLine(workingBuffer[pos - 1]).setHandler(handler);
error.setCode(ActiveMQStompException.INVALID_EOL_V10); error.setCode(ActiveMQStompException.INVALID_EOL_V10);
error.setBody(BUNDLE.unexpectedNewLine(workingBuffer[pos - 1])); error.setBody(BUNDLE.unexpectedNewLine(workingBuffer[pos - 1]));
throw error; throw error;
@ -632,7 +632,7 @@ public class StompDecoder
public void throwInvalid() throws ActiveMQStompException public void throwInvalid() throws ActiveMQStompException
{ {
ActiveMQStompException error = BUNDLE.invalidCommand(this.dumpByteArray(workingBuffer)); ActiveMQStompException error = BUNDLE.invalidCommand(this.dumpByteArray(workingBuffer)).setHandler(handler);
error.setCode(ActiveMQStompException.INVALID_COMMAND); error.setCode(ActiveMQStompException.INVALID_COMMAND);
error.setBody(BUNDLE.invalidFrame(this.dumpByteArray(workingBuffer))); error.setBody(BUNDLE.invalidFrame(this.dumpByteArray(workingBuffer)));
throw error; throw error;

View File

@ -364,7 +364,8 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
ActiveMQStompException e = new ActiveMQStompException("Error sending reply", ActiveMQStompException e = new ActiveMQStompException("Error sending reply",
ActiveMQExceptionType.createException(errorCode, errorMessage)); ActiveMQExceptionType.createException(errorCode, errorMessage))
.setHandler(connection.getFrameHandler());
StompFrame error = e.getFrame(); StompFrame error = e.getFrame();
send(connection, error); send(connection, error);
@ -426,7 +427,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
StompSession session = getTransactedSession(connection, txID); StompSession session = getTransactedSession(connection, txID);
if (session == null) if (session == null)
{ {
throw new ActiveMQStompException("No transaction started: " + txID); throw new ActiveMQStompException(connection, "No transaction started: " + txID);
} }
transactedSessions.remove(txID); transactedSessions.remove(txID);
session.getSession().commit(); session.getSession().commit();
@ -437,7 +438,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
StompSession session = getTransactedSession(connection, txID); StompSession session = getTransactedSession(connection, txID);
if (session == null) if (session == null)
{ {
throw new ActiveMQStompException("No transaction started: " + txID); throw new ActiveMQStompException(connection, "No transaction started: " + txID);
} }
transactedSessions.remove(txID); transactedSessions.remove(txID);
session.getSession().rollback(false); session.getSession().rollback(false);
@ -452,7 +453,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
stompSession.setNoLocal(noLocal); stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) if (stompSession.containsSubscription(subscriptionID))
{ {
throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID + throw new ActiveMQStompException(connection, "There already is a subscription for: " + subscriptionID +
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
} }
long consumerID = server.getStorageManager().generateID(); long consumerID = server.getStorageManager().generateID();
@ -473,7 +474,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName); boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName);
if (!unsubscribed) if (!unsubscribed)
{ {
throw new ActiveMQStompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID); throw new ActiveMQStompException(connection, "Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
} }
} }

View File

@ -223,7 +223,7 @@ public class StompSession implements SessionCallback
if (pair == null) if (pair == null)
{ {
throw BUNDLE.failToAckMissingID(id); throw BUNDLE.failToAckMissingID(id).setHandler(connection.getFrameHandler());
} }
long consumerID = pair.getA(); long consumerID = pair.getA();
@ -235,7 +235,7 @@ public class StompSession implements SessionCallback
{ {
if (!sub.getID().equals(subscriptionID)) if (!sub.getID().equals(subscriptionID))
{ {
throw BUNDLE.subscriptionIDMismatch(subscriptionID, sub.getID()); throw BUNDLE.subscriptionIDMismatch(subscriptionID, sub.getID()).setHandler(connection.getFrameHandler());
} }
} }

View File

@ -158,7 +158,7 @@ public abstract class VersionedStompFrameHandler
public StompFrame onUnknown(String command) public StompFrame onUnknown(String command)
{ {
ActiveMQStompException error = BUNDLE.unknownCommand(command); ActiveMQStompException error = BUNDLE.unknownCommand(command).setHandler(this);
StompFrame response = error.getFrame(); StompFrame response = error.getFrame();
return response; return response;
} }
@ -178,7 +178,7 @@ public abstract class VersionedStompFrameHandler
String txID = request.getHeader(Stomp.Headers.TRANSACTION); String txID = request.getHeader(Stomp.Headers.TRANSACTION);
if (txID == null) if (txID == null)
{ {
ActiveMQStompException error = BUNDLE.needTxIDHeader(); ActiveMQStompException error = BUNDLE.needTxIDHeader().setHandler(this);
response = error.getFrame(); response = error.getFrame();
return response; return response;
} }
@ -230,7 +230,7 @@ public abstract class VersionedStompFrameHandler
} }
catch (Exception e) catch (Exception e)
{ {
ActiveMQStompException error = BUNDLE.errorHandleSend(e); ActiveMQStompException error = BUNDLE.errorHandleSend(e).setHandler(this);
response = error.getFrame(); response = error.getFrame();
} }
@ -248,7 +248,7 @@ public abstract class VersionedStompFrameHandler
String txID = frame.getHeader(Stomp.Headers.TRANSACTION); String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
if (txID == null) if (txID == null)
{ {
ActiveMQStompException error = BUNDLE.beginTxNoID(); ActiveMQStompException error = BUNDLE.beginTxNoID().setHandler(this);
response = error.getFrame(); response = error.getFrame();
} }
else else
@ -272,7 +272,7 @@ public abstract class VersionedStompFrameHandler
if (txID == null) if (txID == null)
{ {
ActiveMQStompException error = BUNDLE.abortTxNoID(); ActiveMQStompException error = BUNDLE.abortTxNoID().setHandler(this);
response = error.getFrame(); response = error.getFrame();
return response; return response;
} }
@ -391,7 +391,7 @@ public abstract class VersionedStompFrameHandler
/** /**
* this method is called when a newer version of handler is created. It should * this method is called when a newer version of handler is created. It should
* take over the state of the decoder of the existingHandler so that * take over the state of the decoder of the existingHandler so that
* the decoding can be continued. For V10 handler it's never get called. * the decoding can be continued. For V10 handler it's never called.
* *
* @param existingHandler * @param existingHandler
*/ */

View File

@ -102,7 +102,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
{ {
if (destination == null) if (destination == null)
{ {
ActiveMQStompException error = BUNDLE.needIDorDestination(); ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
response = error.getFrame(); response = error.getFrame();
return response; return response;
} }

View File

@ -124,7 +124,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
String[] params = heartBeatHeader.split(","); String[] params = heartBeatHeader.split(",");
if (params.length != 2) if (params.length != 2)
{ {
throw new ActiveMQStompException("Incorrect heartbeat header " + heartBeatHeader); throw new ActiveMQStompException(connection, "Incorrect heartbeat header " + heartBeatHeader);
} }
//client ping //client ping
@ -171,7 +171,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
} }
else else
{ {
response = BUNDLE.needSubscriptionID().getFrame(); response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
return response; return response;
} }
@ -202,7 +202,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
if (subscriptionID == null) if (subscriptionID == null)
{ {
response = BUNDLE.needSubscriptionID().getFrame(); response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
return response; return response;
} }
@ -475,7 +475,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
} }
else if (workingBuffer[offset] == CR) else if (workingBuffer[offset] == CR)
{ {
if (nextChar) throw BUNDLE.invalidTwoCRs(); if (nextChar) throw BUNDLE.invalidTwoCRs().setHandler(handler);
nextChar = true; nextChar = true;
} }
else else
@ -488,7 +488,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
if (nextChar) if (nextChar)
{ {
throw BUNDLE.badCRs(); throw BUNDLE.badCRs().setHandler(handler);
} }
//if some EOLs have been processed, drop those bytes before parsing command //if some EOLs have been processed, drop those bytes before parsing command

View File

@ -78,8 +78,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE
if (messageID == null) if (messageID == null)
{ {
ActiveMQStompException error = BUNDLE.noIDInAck(); ActiveMQStompException error = BUNDLE.noIDInAck().setHandler(connection.getFrameHandler());
error.setHandler(connection.getFrameHandler());
return error.getFrame(); return error.getFrame();
} }
@ -214,7 +213,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE
{ {
if (nextEOLChar) if (nextEOLChar)
{ {
throw BUNDLE.invalidTwoCRs(); throw BUNDLE.invalidTwoCRs().setHandler(handler);
} }
nextEOLChar = true; nextEOLChar = true;
break; break;