diff --git a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java index 4a1aea257c..82f7ee0764 100644 --- a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java +++ b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java @@ -28,10 +28,12 @@ import jabber.client.Iq; import jabber.client.Message; import jabber.client.Presence; import jabber.iq.auth.Query; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.*; import org.apache.activemq.transport.xmpp.command.Handler; import org.apache.activemq.transport.xmpp.command.HandlerRegistry; import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.IntSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,16 +76,19 @@ public class ProtocolConverter { private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); + private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator(); private final Map> resposeHandlers = new ConcurrentHashMap>(); private final Map> subscriptionsByConsumerId = new ConcurrentHashMap>(); private final Map jidToConsumerMap = new HashMap(); + private final Map jidToInboxConsumerMap = new HashMap(); private final Map transactions = new ConcurrentHashMap(); private final Object commnadIdMutex = new Object(); private int lastCommandId; private final AtomicBoolean connected = new AtomicBoolean(false); + private ActiveMQTempQueue inboxDestination; public ProtocolConverter(XmppTransport transport) { this.transport = transport; @@ -368,14 +373,32 @@ public class ProtocolConverter { log.debug("No 'to' attribute specified for presence so not creating a JMS subscription"); return; } + subscribe(to, destination, jidToConsumerMap); + // lets subscribe to a personal inbox for replies + + // Check if Destination info is of temporary type. + if (inboxDestination == null) { + inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); + + DestinationInfo info = new DestinationInfo(); + info.setConnectionId(connectionInfo.getConnectionId()); + info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); + info.setDestination(inboxDestination); + sendToActiveMQ(info, null); + + subscribe(to, inboxDestination, jidToInboxConsumerMap); + } + } + + protected void subscribe(final String to, ActiveMQDestination destination, Map consumerMap) { boolean createConsumer = false; ConsumerInfo consumerInfo = null; - synchronized (jidToConsumerMap) { - consumerInfo = jidToConsumerMap.get(to); + synchronized (consumerMap) { + consumerInfo = consumerMap.get(to); if (consumerInfo == null) { consumerInfo = new ConsumerInfo(); - jidToConsumerMap.put(to, consumerInfo); + consumerMap.put(to, consumerInfo); ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); consumerInfo.setConsumerId(consumerId); @@ -404,7 +427,7 @@ public class ProtocolConverter { Message message = createXmppMessage(to, messageDispatch); if (message != null) { if (log.isDebugEnabled()) { - log.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny()); + log.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny()); } transport.marshall(message); } @@ -417,7 +440,7 @@ public class ProtocolConverter { Message answer = new Message(); answer.setType("groupchat"); String from = to; - int idx= from.indexOf('/'); + int idx = from.indexOf('/'); if (idx > 0) { from = from.substring(0, idx) + "/broker"; } @@ -429,11 +452,14 @@ public class ProtocolConverter { if (message instanceof ActiveMQTextMessage) { ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message; Body body = new Body(); - body.setValue(activeMQTextMessage.getText()); + String text = activeMQTextMessage.getText(); + log.info("Setting the body text to be: " + text); + body.setValue(text); answer.getAny().add(body); } else { // TODO support other message types + log.warn("Could not convert the message to a complete Jabber message: " + message); } return answer; } @@ -555,6 +581,13 @@ public class ProtocolConverter { if (idx > 0) { name = name.substring(0, idx); } + + System.out.println("#### Creating ActiveMQ destination for: " + name); + + // lets support lower-case versions of the agent topic + if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) { + name = AdvisorySupport.AGENT_TOPIC; + } return new ActiveMQTopic(name); } @@ -579,7 +612,12 @@ public class ProtocolConverter { answer.setStringProperty("XMPPLang", message.getLang()); answer.setStringProperty("XMPPTo", message.getTo()); answer.setJMSType(message.getType()); - answer.setJMSReplyTo(createActiveMQDestination(message.getFrom())); + ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom()); + if (replyTo == null) { + replyTo = inboxDestination; + } + System.out.println("Setting reply to destination to: " + replyTo); + answer.setJMSReplyTo(replyTo); } protected void onAuth(Auth auth) throws Exception {