git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@906088 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2010-02-03 15:36:12 +00:00
parent caabb6cb3d
commit 5ebfbf0387
1 changed files with 55 additions and 50 deletions

View File

@ -138,29 +138,29 @@ public class ProtocolConverter {
protected void sendToStomp(StompFrame command) throws IOException {
transportFilter.sendToStomp(command);
}
protected FrameTranslator findTranslator(String header) {
FrameTranslator translator = frameTranslator;
try {
if (header != null) {
translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
.newInstance(header);
if (translator instanceof ApplicationContextAware) {
((ApplicationContextAware)translator).setApplicationContext(applicationContext);
}
}
} catch (Exception ignore) {
// if anything goes wrong use the default translator
}
return translator;
}
FrameTranslator translator = frameTranslator;
try {
if (header != null) {
translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
.newInstance(header);
if (translator instanceof ApplicationContextAware) {
((ApplicationContextAware)translator).setApplicationContext(applicationContext);
}
}
} catch (Exception ignore) {
// if anything goes wrong use the default translator
}
return translator;
}
/**
* Convert a stomp command
*
* @param command
*/
* Convert a stomp command
*
* @param command
*/
public void onStompCommand(StompFrame command) throws IOException, JMSException {
try {
@ -199,7 +199,7 @@ public class ProtocolConverter {
}
}
}
protected void handleException(Throwable exception, StompFrame command) throws IOException {
// Let the stomp client know about any protocol errors.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -211,10 +211,10 @@ public class ProtocolConverter {
headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
if (command != null) {
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null) {
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null) {
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
}
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
@ -331,7 +331,7 @@ public class ProtocolConverter {
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
sub.onStompCommit(activemqTx);
@ -343,7 +343,7 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@ -362,9 +362,9 @@ public class ProtocolConverter {
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
try {
sub.onStompAbort(activemqTx);
sub.onStompAbort(activemqTx);
} catch (Exception e) {
throw new ProtocolException("Transaction abort failed", false, e);
throw new ProtocolException("Transaction abort failed", false, e);
}
}
@ -386,6 +386,11 @@ public class ProtocolConverter {
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
ActiveMQDestination actualDest = translator.convertDestination(this, destination);
if (actualDest == null) {
throw new ProtocolException("Invalid Destination.");
}
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000);
@ -430,9 +435,9 @@ public class ProtocolConverter {
if (subscriptionId == null && destination == null) {
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
}
// check if it is a durable subscription
String durable = command.getHeaders().get("activemq.subscriptionName");
String durable = command.getHeaders().get("activemq.subscriptionName");
if (durable != null) {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(durable);
@ -455,12 +460,12 @@ public class ProtocolConverter {
return;
}
}
throw new ProtocolException("No subscription matched.");
}
ConnectionInfo connectionInfo = new ConnectionInfo();
protected void onStompConnect(final StompFrame command) throws ProtocolException {
if (connected.get()) {
@ -506,14 +511,14 @@ public class ProtocolConverter {
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, command);
getTransportFilter().onException(IOExceptionSupport.create(exception));
}
connected.set(true);
HashMap<String, String> responseHeaders = new HashMap<String, String>();
@ -555,7 +560,7 @@ public class ProtocolConverter {
/**
* Dispatch a ActiveMQ command
*
*
* @param command
* @throws IOException
*/
@ -593,39 +598,39 @@ public class ProtocolConverter {
}
public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
if (ignoreTransformation == true) {
return frameTranslator.convertMessage(this, message);
} else {
return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
}
if (ignoreTransformation == true) {
return frameTranslator.convertMessage(this, message);
} else {
return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
}
}
public StompTransportFilter getTransportFilter() {
return transportFilter;
}
public ActiveMQDestination createTempQueue(String name) {
public ActiveMQDestination createTempQueue(String name) {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
}
}
return rc;
}
}
public ActiveMQDestination createTempTopic(String name) {
public ActiveMQDestination createTempTopic(String name) {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
}
return rc;
}
}
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
}