mirror of https://github.com/apache/activemq.git
managed to get the XMPP transport to subscribe to topics and receive messages when tested via the Spark Jabber client
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@467606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7b19dd4cec
commit
ddde7643fd
|
@ -45,6 +45,7 @@ import javax.jms.JMSException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -76,6 +77,8 @@ public class ProtocolConverter {
|
||||||
|
|
||||||
private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
|
private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
|
||||||
private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
|
private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
|
||||||
|
private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
|
||||||
|
|
||||||
private final Map transactions = new ConcurrentHashMap();
|
private final Map transactions = new ConcurrentHashMap();
|
||||||
|
|
||||||
private final Object commnadIdMutex = new Object();
|
private final Object commnadIdMutex = new Object();
|
||||||
|
@ -142,6 +145,9 @@ public class ProtocolConverter {
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
handler.handle(response);
|
handler.handle(response);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
log.warn("No handler for response: " + response);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (command.isMessageDispatch()) {
|
else if (command.isMessageDispatch()) {
|
||||||
MessageDispatch md = (MessageDispatch) command;
|
MessageDispatch md = (MessageDispatch) command;
|
||||||
|
@ -149,6 +155,9 @@ public class ProtocolConverter {
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
handler.handle(md);
|
handler.handle(md);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
log.warn("No handler for message: " + md);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,53 +169,7 @@ public class ProtocolConverter {
|
||||||
Object any = iq.getAny();
|
Object any = iq.getAny();
|
||||||
|
|
||||||
if (any instanceof Query) {
|
if (any instanceof Query) {
|
||||||
Query query = (Query) any;
|
onAuthQuery(any, iq);
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
|
|
||||||
}
|
|
||||||
if (query.getPassword() == null) {
|
|
||||||
Iq result = createResult(iq);
|
|
||||||
Query required = new Query();
|
|
||||||
required.setPassword("");
|
|
||||||
required.setUsername("");
|
|
||||||
result.setAny(required);
|
|
||||||
transport.marshall(result);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//connectionInfo.setClientId(query.getResource());
|
|
||||||
connectionInfo.setUserName(query.getUsername());
|
|
||||||
connectionInfo.setPassword(query.getPassword());
|
|
||||||
|
|
||||||
// TODO support digest?
|
|
||||||
|
|
||||||
if (connectionInfo.getClientId() == null) {
|
|
||||||
connectionInfo.setClientId(clientIdGenerator.generateId());
|
|
||||||
}
|
|
||||||
|
|
||||||
sendToActiveMQ(connectionInfo, new Handler<Response>() {
|
|
||||||
public void handle(Response response) throws Exception {
|
|
||||||
|
|
||||||
Iq result = createResult(iq);
|
|
||||||
|
|
||||||
if (response instanceof ExceptionResponse) {
|
|
||||||
ExceptionResponse exceptionResponse = (ExceptionResponse) response;
|
|
||||||
jabber.client.Error error = new jabber.client.Error();
|
|
||||||
result.setError(error);
|
|
||||||
|
|
||||||
StringWriter buffer = new StringWriter();
|
|
||||||
exceptionResponse.getException().printStackTrace(new PrintWriter(buffer));
|
|
||||||
error.setInternalServerError(buffer.toString());
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
connected.set(true);
|
|
||||||
}
|
|
||||||
transport.marshall(result);
|
|
||||||
|
|
||||||
sendToActiveMQ(sessionInfo, null);
|
|
||||||
sendToActiveMQ(producerInfo, null);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (any instanceof jabber.iq._private.Query) {
|
else if (any instanceof jabber.iq._private.Query) {
|
||||||
|
@ -255,6 +218,60 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void onAuthQuery(Object any, final Iq iq) throws IOException {
|
||||||
|
Query query = (Query) any;
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
|
||||||
|
}
|
||||||
|
if (query.getPassword() == null) {
|
||||||
|
Iq result = createResult(iq);
|
||||||
|
Query required = new Query();
|
||||||
|
required.setPassword("");
|
||||||
|
required.setUsername("");
|
||||||
|
result.setAny(required);
|
||||||
|
transport.marshall(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//connectionInfo.setClientId(query.getResource());
|
||||||
|
connectionInfo.setUserName(query.getUsername());
|
||||||
|
connectionInfo.setPassword(query.getPassword());
|
||||||
|
|
||||||
|
// TODO support digest?
|
||||||
|
|
||||||
|
if (connectionInfo.getClientId() == null) {
|
||||||
|
connectionInfo.setClientId(clientIdGenerator.generateId());
|
||||||
|
}
|
||||||
|
|
||||||
|
sendToActiveMQ(connectionInfo, new Handler<Response>() {
|
||||||
|
public void handle(Response response) throws Exception {
|
||||||
|
|
||||||
|
Iq result = createResult(iq);
|
||||||
|
|
||||||
|
if (response instanceof ExceptionResponse) {
|
||||||
|
ExceptionResponse exceptionResponse = (ExceptionResponse) response;
|
||||||
|
Throwable exception = exceptionResponse.getException();
|
||||||
|
|
||||||
|
log.warn("Failed to create connection: " + exception, exception);
|
||||||
|
|
||||||
|
Error error = new Error();
|
||||||
|
result.setError(error);
|
||||||
|
|
||||||
|
StringWriter buffer = new StringWriter();
|
||||||
|
exception.printStackTrace(new PrintWriter(buffer));
|
||||||
|
error.setInternalServerError(buffer.toString());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
connected.set(true);
|
||||||
|
}
|
||||||
|
transport.marshall(result);
|
||||||
|
|
||||||
|
sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion"));
|
||||||
|
sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
protected String debugString(Iq iq) {
|
protected String debugString(Iq iq) {
|
||||||
return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
|
return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
|
||||||
}
|
}
|
||||||
|
@ -325,20 +342,101 @@ public class ProtocolConverter {
|
||||||
transport.marshall(result);
|
transport.marshall(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void onPresence(Presence presence) throws IOException {
|
protected void onPresence(Presence presence) throws IOException, JMSException {
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType()
|
log.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType()
|
||||||
+ " showOrStatusOrPriority: " + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
|
+ " showOrStatusOrPriority: " + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
|
||||||
}
|
}
|
||||||
|
org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
|
||||||
|
item.setAffiliation("owner");
|
||||||
|
item.setRole("moderator");
|
||||||
|
item.setNick("broker");
|
||||||
|
sendPresence(presence, item);
|
||||||
|
|
||||||
|
item = new org.jabber.protocol.muc_user.Item();
|
||||||
|
item.setAffiliation("admin");
|
||||||
|
item.setRole("moderator");
|
||||||
|
sendPresence(presence, item);
|
||||||
|
|
||||||
|
// lets create a subscription
|
||||||
|
final String to = presence.getTo();
|
||||||
|
|
||||||
|
|
||||||
|
boolean createConsumer = false;
|
||||||
|
ConsumerInfo consumerInfo = null;
|
||||||
|
synchronized (jidToConsumerMap) {
|
||||||
|
consumerInfo = jidToConsumerMap.get(to);
|
||||||
|
if (consumerInfo == null) {
|
||||||
|
consumerInfo = new ConsumerInfo();
|
||||||
|
jidToConsumerMap.put(to, consumerInfo);
|
||||||
|
|
||||||
|
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
||||||
|
consumerInfo.setConsumerId(consumerId);
|
||||||
|
consumerInfo.setPrefetchSize(10);
|
||||||
|
createConsumer = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!createConsumer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ActiveMQDestination destination = createActiveMQDestination(to);
|
||||||
|
consumerInfo.setDestination(destination);
|
||||||
|
|
||||||
|
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
|
||||||
|
public void handle(MessageDispatch messageDispatch) throws Exception {
|
||||||
|
// processing the inbound message
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Receiving inbound: " + messageDispatch.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// lets send back an ACK
|
||||||
|
MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
|
||||||
|
sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
|
||||||
|
|
||||||
|
Message message = createXmppMessage(to, messageDispatch);
|
||||||
|
if (message != null) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny());
|
||||||
|
}
|
||||||
|
transport.marshall(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException {
|
||||||
|
Message answer = new Message();
|
||||||
|
answer.setType("groupchat");
|
||||||
|
String from = to;
|
||||||
|
int idx= from.indexOf('/');
|
||||||
|
if (idx > 0) {
|
||||||
|
from = from.substring(0, idx) + "/broker";
|
||||||
|
}
|
||||||
|
answer.setFrom(from);
|
||||||
|
answer.setTo(to);
|
||||||
|
|
||||||
|
org.apache.activemq.command.Message message = messageDispatch.getMessage();
|
||||||
|
//answer.setType(message.getType());
|
||||||
|
if (message instanceof ActiveMQTextMessage) {
|
||||||
|
ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
|
||||||
|
Body body = new Body();
|
||||||
|
body.setValue(activeMQTextMessage.getText());
|
||||||
|
answer.getAny().add(body);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// TODO support other message types
|
||||||
|
}
|
||||||
|
return answer;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException {
|
||||||
Presence answer = new Presence();
|
Presence answer = new Presence();
|
||||||
answer.setFrom(presence.getTo());
|
answer.setFrom(presence.getTo());
|
||||||
answer.setType(presence.getType());
|
answer.setType(presence.getType());
|
||||||
answer.setTo(presence.getFrom());
|
answer.setTo(presence.getFrom());
|
||||||
X x = new X();
|
X x = new X();
|
||||||
org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
|
|
||||||
item.setAffiliation("owner");
|
|
||||||
item.setRole("moderator");
|
|
||||||
item.setNick("broker");
|
|
||||||
x.getDeclineOrDestroyOrInvite().add(item);
|
x.getDeclineOrDestroyOrInvite().add(item);
|
||||||
answer.getShowOrStatusOrPriority().add(x);
|
answer.getShowOrStatusOrPriority().add(x);
|
||||||
transport.marshall(answer);
|
transport.marshall(answer);
|
||||||
|
@ -400,20 +498,42 @@ public class ProtocolConverter {
|
||||||
log.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
|
log.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
|
||||||
}
|
}
|
||||||
|
|
||||||
ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
|
final ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
|
||||||
|
|
||||||
ActiveMQDestination destination = createActiveMQDestination(message.getTo());
|
ActiveMQDestination destination = createActiveMQDestination(message.getTo());
|
||||||
|
|
||||||
activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
|
activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
|
||||||
activeMQMessage.setDestination(destination);
|
activeMQMessage.setDestination(destination);
|
||||||
|
activeMQMessage.setProducerId(producerId);
|
||||||
|
activeMQMessage.setTimestamp(System.currentTimeMillis());
|
||||||
addActiveMQMessageHeaders(activeMQMessage, message);
|
addActiveMQMessageHeaders(activeMQMessage, message);
|
||||||
|
|
||||||
MessageDispatch dispatch = new MessageDispatch();
|
MessageDispatch dispatch = new MessageDispatch();
|
||||||
dispatch.setDestination(destination);
|
dispatch.setDestination(destination);
|
||||||
dispatch.setMessage(activeMQMessage);
|
dispatch.setMessage(activeMQMessage);
|
||||||
sendToActiveMQ(dispatch, null);
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Sending ActiveMQ message: " + activeMQMessage);
|
||||||
|
}
|
||||||
|
sendToActiveMQ(dispatch, createErrorHandler("send message"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Handler<Response> createErrorHandler(final String text) {
|
||||||
|
return new Handler<Response>() {
|
||||||
|
public void handle(Response event) throws Exception {
|
||||||
|
if (event instanceof ExceptionResponse) {
|
||||||
|
ExceptionResponse exceptionResponse = (ExceptionResponse) event;
|
||||||
|
Throwable exception = exceptionResponse.getException();
|
||||||
|
log.error("Failed to " + text + ". Reason: " + exception, exception);
|
||||||
|
}
|
||||||
|
else if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Completed " + text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts the Jabber destination name into a destination in ActiveMQ
|
* Converts the Jabber destination name into a destination in ActiveMQ
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -101,16 +101,17 @@ public class XmppTransport extends TcpTransport {
|
||||||
|
|
||||||
// now lets write the features
|
// now lets write the features
|
||||||
Features features = new Features();
|
Features features = new Features();
|
||||||
|
|
||||||
|
// TODO support TLS
|
||||||
//features.getAny().add(new Starttls());
|
//features.getAny().add(new Starttls());
|
||||||
|
|
||||||
Mechanisms mechanisms = new Mechanisms();
|
Mechanisms mechanisms = new Mechanisms();
|
||||||
|
|
||||||
|
// TODO support SASL
|
||||||
//mechanisms.getMechanism().add("DIGEST-MD5");
|
//mechanisms.getMechanism().add("DIGEST-MD5");
|
||||||
//mechanisms.getMechanism().add("PLAIN");
|
//mechanisms.getMechanism().add("PLAIN");
|
||||||
features.getAny().add(mechanisms);
|
features.getAny().add(mechanisms);
|
||||||
marshall(features);
|
marshall(features);
|
||||||
/*
|
|
||||||
xmlWriter.flush();
|
|
||||||
outputStream.flush();
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
catch (XMLStreamException e) {
|
catch (XMLStreamException e) {
|
||||||
throw IOExceptionSupport.create(e);
|
throw IOExceptionSupport.create(e);
|
||||||
|
|
|
@ -28,8 +28,30 @@ import org.jivesoftware.smack.XMPPConnection;
|
||||||
public class XmppTest extends TestCase {
|
public class XmppTest extends TestCase {
|
||||||
|
|
||||||
private XmppBroker broker = new XmppBroker();
|
private XmppBroker broker = new XmppBroker();
|
||||||
private boolean block = true;
|
private boolean block = false;
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
XmppTest test = new XmppTest();
|
||||||
|
test.block = true;
|
||||||
|
try {
|
||||||
|
test.setUp();
|
||||||
|
test.testConnect();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
System.out.println("Caught: " + e);
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
try {
|
||||||
|
test.tearDown();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
System.out.println("Caught: " + e);
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
public void testConnect() throws Exception {
|
public void testConnect() throws Exception {
|
||||||
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
|
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
|
||||||
//config.setDebuggerEnabled(true);
|
//config.setDebuggerEnabled(true);
|
||||||
|
|
Loading…
Reference in New Issue