diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 5c37348463..b2a01f30d9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -138,29 +138,29 @@ public class ProtocolConverter { protected void sendToStomp(StompFrame command) throws IOException { transportFilter.sendToStomp(command); } - + protected FrameTranslator findTranslator(String header) { - FrameTranslator translator = frameTranslator; - try { - if (header != null) { - translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER - .newInstance(header); - if (translator instanceof ApplicationContextAware) { - ((ApplicationContextAware)translator).setApplicationContext(applicationContext); - } - } - } catch (Exception ignore) { - // if anything goes wrong use the default translator - } - - return translator; - } + FrameTranslator translator = frameTranslator; + try { + if (header != null) { + translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER + .newInstance(header); + if (translator instanceof ApplicationContextAware) { + ((ApplicationContextAware)translator).setApplicationContext(applicationContext); + } + } + } catch (Exception ignore) { + // if anything goes wrong use the default translator + } + + return translator; + } /** - * Convert a stomp command - * - * @param command - */ + * Convert a stomp command + * + * @param command + */ public void onStompCommand(StompFrame command) throws IOException, JMSException { try { @@ -199,7 +199,7 @@ public class ProtocolConverter { } } } - + protected void handleException(Throwable exception, StompFrame command) throws IOException { // Let the stomp client know about any protocol errors. ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -211,10 +211,10 @@ public class ProtocolConverter { headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); if (command != null) { - final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); - if (receiptId != null) { - headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); - } + final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + if (receiptId != null) { + headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + } } StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); @@ -331,7 +331,7 @@ public class ProtocolConverter { if (activemqTx == null) { throw new ProtocolException("Invalid transaction id: " + stompTx); } - + for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { StompSubscription sub = iter.next(); sub.onStompCommit(activemqTx); @@ -343,7 +343,7 @@ public class ProtocolConverter { tx.setType(TransactionInfo.COMMIT_ONE_PHASE); sendToActiveMQ(tx, createResponseHandler(command)); - + } protected void onStompAbort(StompFrame command) throws ProtocolException { @@ -362,9 +362,9 @@ public class ProtocolConverter { for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { StompSubscription sub = iter.next(); try { - sub.onStompAbort(activemqTx); + sub.onStompAbort(activemqTx); } catch (Exception e) { - throw new ProtocolException("Transaction abort failed", false, e); + throw new ProtocolException("Transaction abort failed", false, e); } } @@ -386,6 +386,11 @@ public class ProtocolConverter { String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); ActiveMQDestination actualDest = translator.convertDestination(this, destination); + + if (actualDest == null) { + throw new ProtocolException("Invalid Destination."); + } + ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); consumerInfo.setPrefetchSize(1000); @@ -430,9 +435,9 @@ public class ProtocolConverter { if (subscriptionId == null && destination == null) { throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); } - + // check if it is a durable subscription - String durable = command.getHeaders().get("activemq.subscriptionName"); + String durable = command.getHeaders().get("activemq.subscriptionName"); if (durable != null) { RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(durable); @@ -455,12 +460,12 @@ public class ProtocolConverter { return; } } - + throw new ProtocolException("No subscription matched."); } ConnectionInfo connectionInfo = new ConnectionInfo(); - + protected void onStompConnect(final StompFrame command) throws ProtocolException { if (connected.get()) { @@ -506,14 +511,14 @@ public class ProtocolConverter { final ProducerInfo producerInfo = new ProducerInfo(producerId); sendToActiveMQ(producerInfo, new ResponseHandler() { public void onResponse(ProtocolConverter converter, Response response) throws IOException { - + if (response.isException()) { // If the connection attempt fails we close the socket. Throwable exception = ((ExceptionResponse)response).getException(); handleException(exception, command); getTransportFilter().onException(IOExceptionSupport.create(exception)); } - + connected.set(true); HashMap responseHeaders = new HashMap(); @@ -555,7 +560,7 @@ public class ProtocolConverter { /** * Dispatch a ActiveMQ command - * + * * @param command * @throws IOException */ @@ -593,39 +598,39 @@ public class ProtocolConverter { } public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { - if (ignoreTransformation == true) { - return frameTranslator.convertMessage(this, message); - } else { - return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); - } + if (ignoreTransformation == true) { + return frameTranslator.convertMessage(this, message); + } else { + return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); + } } public StompTransportFilter getTransportFilter() { return transportFilter; } - public ActiveMQDestination createTempQueue(String name) { + public ActiveMQDestination createTempQueue(String name) { ActiveMQDestination rc = tempDestinations.get(name); if( rc == null ) { rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); tempDestinations.put(name, rc); - } + } return rc; - } + } - public ActiveMQDestination createTempTopic(String name) { + public ActiveMQDestination createTempTopic(String name) { ActiveMQDestination rc = tempDestinations.get(name); if( rc == null ) { rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); tempDestinations.put(name, rc); tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); - } + } return rc; - } + } - public String getCreatedTempDestinationName(ActiveMQDestination destination) { - return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); - } + public String getCreatedTempDestinationName(ActiveMQDestination destination) { + return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); + } }