use a temporary queue so replies can be sent to the client when they send to admin topics like ActiveMQ.Agent

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475289 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-11-15 16:08:46 +00:00
parent ba476b36f0
commit 9260e941f9
1 changed files with 45 additions and 7 deletions

View File

@ -28,10 +28,12 @@ import jabber.client.Iq;
import jabber.client.Message; import jabber.client.Message;
import jabber.client.Presence; import jabber.client.Presence;
import jabber.iq.auth.Query; import jabber.iq.auth.Query;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.activemq.transport.xmpp.command.Handler; import org.apache.activemq.transport.xmpp.command.Handler;
import org.apache.activemq.transport.xmpp.command.HandlerRegistry; import org.apache.activemq.transport.xmpp.command.HandlerRegistry;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -74,16 +76,19 @@ public class ProtocolConverter {
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator();
private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>(); private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>(); private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>(); private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String, ConsumerInfo>();
private final Map transactions = new ConcurrentHashMap(); private final Map transactions = new ConcurrentHashMap();
private final Object commnadIdMutex = new Object(); private final Object commnadIdMutex = new Object();
private int lastCommandId; private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean connected = new AtomicBoolean(false);
private ActiveMQTempQueue inboxDestination;
public ProtocolConverter(XmppTransport transport) { public ProtocolConverter(XmppTransport transport) {
this.transport = transport; this.transport = transport;
@ -368,14 +373,32 @@ public class ProtocolConverter {
log.debug("No 'to' attribute specified for presence so not creating a JMS subscription"); log.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
return; return;
} }
subscribe(to, destination, jidToConsumerMap);
// lets subscribe to a personal inbox for replies
// Check if Destination info is of temporary type.
if (inboxDestination == null) {
inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionInfo.getConnectionId());
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(inboxDestination);
sendToActiveMQ(info, null);
subscribe(to, inboxDestination, jidToInboxConsumerMap);
}
}
protected void subscribe(final String to, ActiveMQDestination destination, Map<String, ConsumerInfo> consumerMap) {
boolean createConsumer = false; boolean createConsumer = false;
ConsumerInfo consumerInfo = null; ConsumerInfo consumerInfo = null;
synchronized (jidToConsumerMap) { synchronized (consumerMap) {
consumerInfo = jidToConsumerMap.get(to); consumerInfo = consumerMap.get(to);
if (consumerInfo == null) { if (consumerInfo == null) {
consumerInfo = new ConsumerInfo(); consumerInfo = new ConsumerInfo();
jidToConsumerMap.put(to, consumerInfo); consumerMap.put(to, consumerInfo);
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
consumerInfo.setConsumerId(consumerId); consumerInfo.setConsumerId(consumerId);
@ -429,11 +452,14 @@ public class ProtocolConverter {
if (message instanceof ActiveMQTextMessage) { if (message instanceof ActiveMQTextMessage) {
ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message; ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
Body body = new Body(); Body body = new Body();
body.setValue(activeMQTextMessage.getText()); String text = activeMQTextMessage.getText();
log.info("Setting the body text to be: " + text);
body.setValue(text);
answer.getAny().add(body); answer.getAny().add(body);
} }
else { else {
// TODO support other message types // TODO support other message types
log.warn("Could not convert the message to a complete Jabber message: " + message);
} }
return answer; return answer;
} }
@ -555,6 +581,13 @@ public class ProtocolConverter {
if (idx > 0) { if (idx > 0) {
name = name.substring(0, idx); name = name.substring(0, idx);
} }
System.out.println("#### Creating ActiveMQ destination for: " + name);
// lets support lower-case versions of the agent topic
if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) {
name = AdvisorySupport.AGENT_TOPIC;
}
return new ActiveMQTopic(name); return new ActiveMQTopic(name);
} }
@ -579,7 +612,12 @@ public class ProtocolConverter {
answer.setStringProperty("XMPPLang", message.getLang()); answer.setStringProperty("XMPPLang", message.getLang());
answer.setStringProperty("XMPPTo", message.getTo()); answer.setStringProperty("XMPPTo", message.getTo());
answer.setJMSType(message.getType()); answer.setJMSType(message.getType());
answer.setJMSReplyTo(createActiveMQDestination(message.getFrom())); ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom());
if (replyTo == null) {
replyTo = inboxDestination;
}
System.out.println("Setting reply to destination to: " + replyTo);
answer.setJMSReplyTo(replyTo);
} }
protected void onAuth(Auth auth) throws Exception { protected void onAuth(Auth auth) throws Exception {