diff --git a/activemq-xmpp/pom.xml b/activemq-xmpp/pom.xml
index 3147694dae..3fe7e8f36d 100755
--- a/activemq-xmpp/pom.xml
+++ b/activemq-xmpp/pom.xml
@@ -177,16 +177,11 @@
-
maven-surefire-plugin
-
+
diff --git a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
index fdf5503815..4a1aea257c 100644
--- a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
+++ b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java
@@ -353,14 +353,21 @@ public class ProtocolConverter {
item.setNick("broker");
sendPresence(presence, item);
+ /*
item = new org.jabber.protocol.muc_user.Item();
item.setAffiliation("admin");
item.setRole("moderator");
sendPresence(presence, item);
+ */
// lets create a subscription
final String to = presence.getTo();
+ ActiveMQDestination destination = createActiveMQDestination(to);
+ if (destination == null) {
+ log.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
+ return;
+ }
boolean createConsumer = false;
ConsumerInfo consumerInfo = null;
@@ -373,6 +380,7 @@ public class ProtocolConverter {
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
consumerInfo.setConsumerId(consumerId);
consumerInfo.setPrefetchSize(10);
+ consumerInfo.setNoLocal(true);
createConsumer = true;
}
}
@@ -380,7 +388,6 @@ public class ProtocolConverter {
return;
}
- ActiveMQDestination destination = createActiveMQDestination(to);
consumerInfo.setDestination(destination);
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler() {
@@ -508,14 +515,16 @@ public class ProtocolConverter {
activeMQMessage.setTimestamp(System.currentTimeMillis());
addActiveMQMessageHeaders(activeMQMessage, message);
+ /*
MessageDispatch dispatch = new MessageDispatch();
dispatch.setDestination(destination);
dispatch.setMessage(activeMQMessage);
+ */
if (log.isDebugEnabled()) {
log.debug("Sending ActiveMQ message: " + activeMQMessage);
}
- sendToActiveMQ(dispatch, createErrorHandler("send message"));
+ sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
}
protected Handler createErrorHandler(final String text) {
diff --git a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
index e0d1db9e33..6d3ac5f2bc 100644
--- a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
+++ b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
@@ -69,7 +69,8 @@ public class XmppTransport extends TcpTransport {
protected OutputStream outputStream;
protected InputStream inputStream;
private ProtocolConverter converter;
- private String from;
+ private String from = "localhost";
+ private String brokerId = "broker-id-1";
public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
@@ -94,24 +95,10 @@ public class XmppTransport extends TcpTransport {
if (command instanceof BrokerInfo) {
BrokerInfo brokerInfo = (BrokerInfo) command;
- String id = brokerInfo.getBrokerId().toString();
+ brokerId = brokerInfo.getBrokerId().toString();
from = brokerInfo.getBrokerName();
try {
- writeOpenStream(id, from);
-
- // now lets write the features
- Features features = new Features();
-
- // TODO support TLS
- //features.getAny().add(new Starttls());
-
- Mechanisms mechanisms = new Mechanisms();
-
- // TODO support SASL
- //mechanisms.getMechanism().add("DIGEST-MD5");
- //mechanisms.getMechanism().add("PLAIN");
- features.getAny().add(mechanisms);
- marshall(features);
+ writeOpenStream(brokerId, from);
}
catch (XMLStreamException e) {
throw IOExceptionSupport.create(e);
@@ -139,9 +126,14 @@ public class XmppTransport extends TcpTransport {
* Marshalls the given POJO to the client
*/
public void marshall(Object command) throws IOException {
+ if (isStopped() || isStopping()) {
+ log.warn("Not marshalling command as shutting down: " + command);
+ return;
+ }
try {
marshaller.marshal(command, xmlWriter);
xmlWriter.flush();
+ outputStream.flush();
}
catch (JAXBException e) {
throw IOExceptionSupport.create(e);
@@ -193,7 +185,8 @@ public class XmppTransport extends TcpTransport {
if (event.getEventType() == XMLEvent.END_ELEMENT) {
break;
}
- else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
+ else
+ if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
break;
}
else {
@@ -250,16 +243,30 @@ public class XmppTransport extends TcpTransport {
@Override
protected void initializeStreams() throws Exception {
// TODO it would be preferable to use class discovery here!
- context = JAXBContext.newInstance("jabber.client" + ":jabber.server"
- + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.gateway" + ":jabber.iq.last" + ":jabber.iq.oob"
- + ":jabber.iq.pass" + ":jabber.iq.roster" + ":jabber.iq.time" + ":jabber.iq.version"
- + ":org.jabber.etherx.streams" + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
+ context = JAXBContext.newInstance("jabber.client"
+ /*
+ + ":jabber.server"
+ + ":jabber.iq.gateway"
+ + ":jabber.iq.last"
+ + ":jabber.iq.oob"
+ + ":jabber.iq.pass"
+ + ":jabber.iq.time"
+ + ":jabber.iq.version"
+ + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
+ ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
+ + ":org.jabber.protocol.muc_admin"
+ + ":org.jabber.protocol.muc_unique"
+ */
+ + ":jabber.iq._private"
+ + ":jabber.iq.auth"
+ + ":jabber.iq.roster"
+ + ":org.jabber.etherx.streams"
+ ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
- + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_admin"
- + ":org.jabber.protocol.muc_unique" + ":org.jabber.protocol.muc_user"
+ + ":org.jabber.protocol.muc"
+ + ":org.jabber.protocol.muc_user"
+ ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
- + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
+ + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"
+ );
inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
@@ -270,6 +277,7 @@ public class XmppTransport extends TcpTransport {
}
protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
+ log.debug("Sending initial stream element");
XMLOutputFactory factory = XMLOutputFactory.newInstance();
//factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
xmlWriter = factory.createXMLStreamWriter(outputStream);
@@ -286,7 +294,23 @@ public class XmppTransport extends TcpTransport {
}
xmlWriter.writeAttribute("to", to);
xmlWriter.writeAttribute("from", from);
- xmlWriter.writeCharacters("\n");
+
+
+ // now lets write the features
+ Features features = new Features();
+
+ // TODO support TLS
+ //features.getAny().add(new Starttls());
+
+ Mechanisms mechanisms = new Mechanisms();
+
+ // TODO support SASL
+ //mechanisms.getMechanism().add("DIGEST-MD5");
+ //mechanisms.getMechanism().add("PLAIN");
+ features.getAny().add(mechanisms);
+ marshall(features);
+
+ log.debug("Initial stream element sent!");
}
}
diff --git a/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java b/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
index e409611140..d2616b87a8 100644
--- a/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
+++ b/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
@@ -18,40 +18,25 @@
package org.apache.activemq.transport.xmpp;
import junit.framework.TestCase;
+import junit.textui.TestRunner;
import org.jivesoftware.smack.Chat;
-import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
/**
* @version $Revision$
*/
public class XmppTest extends TestCase {
+ protected static boolean block = false;
+
private XmppBroker broker = new XmppBroker();
- private boolean block = false;
public static void main(String[] args) {
- XmppTest test = new XmppTest();
- test.block = true;
- try {
- test.setUp();
- test.testConnect();
- }
- catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- }
- finally {
- try {
- test.tearDown();
- }
- catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- }
- }
-
+ block = true;
+ TestRunner.run(XmppTest.class);
}
+
public void testConnect() throws Exception {
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
//config.setDebuggerEnabled(true);
@@ -69,8 +54,13 @@ public class XmppTest extends TestCase {
System.out.println("Sent all messages!");
}
catch (XMPPException e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
+ if (block) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ }
+ else {
+ throw e;
+ }
}
if (block) {
Thread.sleep(20000);