mirror of https://github.com/apache/activemq.git
fixed up the test case so its working now along with fixed a schoolboy error in sending messages over XMPP so that works nicely now. FWIW you can now connect via a Jabber client to the broker and interchange messages with the example JMS programs and the web console!
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@468026 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
04015a5e3d
commit
83573a052c
|
@ -177,16 +177,11 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
||||||
<!-- Configure which tests are included/excuded -->
|
|
||||||
<plugin>
|
<plugin>
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
<configuration>
|
<configuration>
|
||||||
<!--
|
|
||||||
<includes>
|
|
||||||
<include>**/*Test.*</include>
|
|
||||||
</includes>
|
|
||||||
-->
|
|
||||||
<excludes>
|
<excludes>
|
||||||
|
<!--<exclude>**/XmppTest.*</exclude>-->
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -353,14 +353,21 @@ public class ProtocolConverter {
|
||||||
item.setNick("broker");
|
item.setNick("broker");
|
||||||
sendPresence(presence, item);
|
sendPresence(presence, item);
|
||||||
|
|
||||||
|
/*
|
||||||
item = new org.jabber.protocol.muc_user.Item();
|
item = new org.jabber.protocol.muc_user.Item();
|
||||||
item.setAffiliation("admin");
|
item.setAffiliation("admin");
|
||||||
item.setRole("moderator");
|
item.setRole("moderator");
|
||||||
sendPresence(presence, item);
|
sendPresence(presence, item);
|
||||||
|
*/
|
||||||
|
|
||||||
// lets create a subscription
|
// lets create a subscription
|
||||||
final String to = presence.getTo();
|
final String to = presence.getTo();
|
||||||
|
|
||||||
|
ActiveMQDestination destination = createActiveMQDestination(to);
|
||||||
|
if (destination == null) {
|
||||||
|
log.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
boolean createConsumer = false;
|
boolean createConsumer = false;
|
||||||
ConsumerInfo consumerInfo = null;
|
ConsumerInfo consumerInfo = null;
|
||||||
|
@ -373,6 +380,7 @@ public class ProtocolConverter {
|
||||||
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
||||||
consumerInfo.setConsumerId(consumerId);
|
consumerInfo.setConsumerId(consumerId);
|
||||||
consumerInfo.setPrefetchSize(10);
|
consumerInfo.setPrefetchSize(10);
|
||||||
|
consumerInfo.setNoLocal(true);
|
||||||
createConsumer = true;
|
createConsumer = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -380,7 +388,6 @@ public class ProtocolConverter {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ActiveMQDestination destination = createActiveMQDestination(to);
|
|
||||||
consumerInfo.setDestination(destination);
|
consumerInfo.setDestination(destination);
|
||||||
|
|
||||||
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
|
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
|
||||||
|
@ -508,14 +515,16 @@ public class ProtocolConverter {
|
||||||
activeMQMessage.setTimestamp(System.currentTimeMillis());
|
activeMQMessage.setTimestamp(System.currentTimeMillis());
|
||||||
addActiveMQMessageHeaders(activeMQMessage, message);
|
addActiveMQMessageHeaders(activeMQMessage, message);
|
||||||
|
|
||||||
|
/*
|
||||||
MessageDispatch dispatch = new MessageDispatch();
|
MessageDispatch dispatch = new MessageDispatch();
|
||||||
dispatch.setDestination(destination);
|
dispatch.setDestination(destination);
|
||||||
dispatch.setMessage(activeMQMessage);
|
dispatch.setMessage(activeMQMessage);
|
||||||
|
*/
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Sending ActiveMQ message: " + activeMQMessage);
|
log.debug("Sending ActiveMQ message: " + activeMQMessage);
|
||||||
}
|
}
|
||||||
sendToActiveMQ(dispatch, createErrorHandler("send message"));
|
sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Handler<Response> createErrorHandler(final String text) {
|
protected Handler<Response> createErrorHandler(final String text) {
|
||||||
|
|
|
@ -69,7 +69,8 @@ public class XmppTransport extends TcpTransport {
|
||||||
protected OutputStream outputStream;
|
protected OutputStream outputStream;
|
||||||
protected InputStream inputStream;
|
protected InputStream inputStream;
|
||||||
private ProtocolConverter converter;
|
private ProtocolConverter converter;
|
||||||
private String from;
|
private String from = "localhost";
|
||||||
|
private String brokerId = "broker-id-1";
|
||||||
|
|
||||||
public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
|
public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
|
||||||
super(wireFormat, socket);
|
super(wireFormat, socket);
|
||||||
|
@ -94,24 +95,10 @@ public class XmppTransport extends TcpTransport {
|
||||||
if (command instanceof BrokerInfo) {
|
if (command instanceof BrokerInfo) {
|
||||||
BrokerInfo brokerInfo = (BrokerInfo) command;
|
BrokerInfo brokerInfo = (BrokerInfo) command;
|
||||||
|
|
||||||
String id = brokerInfo.getBrokerId().toString();
|
brokerId = brokerInfo.getBrokerId().toString();
|
||||||
from = brokerInfo.getBrokerName();
|
from = brokerInfo.getBrokerName();
|
||||||
try {
|
try {
|
||||||
writeOpenStream(id, from);
|
writeOpenStream(brokerId, from);
|
||||||
|
|
||||||
// now lets write the features
|
|
||||||
Features features = new Features();
|
|
||||||
|
|
||||||
// TODO support TLS
|
|
||||||
//features.getAny().add(new Starttls());
|
|
||||||
|
|
||||||
Mechanisms mechanisms = new Mechanisms();
|
|
||||||
|
|
||||||
// TODO support SASL
|
|
||||||
//mechanisms.getMechanism().add("DIGEST-MD5");
|
|
||||||
//mechanisms.getMechanism().add("PLAIN");
|
|
||||||
features.getAny().add(mechanisms);
|
|
||||||
marshall(features);
|
|
||||||
}
|
}
|
||||||
catch (XMLStreamException e) {
|
catch (XMLStreamException e) {
|
||||||
throw IOExceptionSupport.create(e);
|
throw IOExceptionSupport.create(e);
|
||||||
|
@ -139,9 +126,14 @@ public class XmppTransport extends TcpTransport {
|
||||||
* Marshalls the given POJO to the client
|
* Marshalls the given POJO to the client
|
||||||
*/
|
*/
|
||||||
public void marshall(Object command) throws IOException {
|
public void marshall(Object command) throws IOException {
|
||||||
|
if (isStopped() || isStopping()) {
|
||||||
|
log.warn("Not marshalling command as shutting down: " + command);
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
marshaller.marshal(command, xmlWriter);
|
marshaller.marshal(command, xmlWriter);
|
||||||
xmlWriter.flush();
|
xmlWriter.flush();
|
||||||
|
outputStream.flush();
|
||||||
}
|
}
|
||||||
catch (JAXBException e) {
|
catch (JAXBException e) {
|
||||||
throw IOExceptionSupport.create(e);
|
throw IOExceptionSupport.create(e);
|
||||||
|
@ -193,7 +185,8 @@ public class XmppTransport extends TcpTransport {
|
||||||
if (event.getEventType() == XMLEvent.END_ELEMENT) {
|
if (event.getEventType() == XMLEvent.END_ELEMENT) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
|
else
|
||||||
|
if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -250,16 +243,30 @@ public class XmppTransport extends TcpTransport {
|
||||||
@Override
|
@Override
|
||||||
protected void initializeStreams() throws Exception {
|
protected void initializeStreams() throws Exception {
|
||||||
// TODO it would be preferable to use class discovery here!
|
// TODO it would be preferable to use class discovery here!
|
||||||
context = JAXBContext.newInstance("jabber.client" + ":jabber.server"
|
context = JAXBContext.newInstance("jabber.client"
|
||||||
+ ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.gateway" + ":jabber.iq.last" + ":jabber.iq.oob"
|
/*
|
||||||
+ ":jabber.iq.pass" + ":jabber.iq.roster" + ":jabber.iq.time" + ":jabber.iq.version"
|
+ ":jabber.server"
|
||||||
+ ":org.jabber.etherx.streams" + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
|
+ ":jabber.iq.gateway"
|
||||||
|
+ ":jabber.iq.last"
|
||||||
|
+ ":jabber.iq.oob"
|
||||||
|
+ ":jabber.iq.pass"
|
||||||
|
+ ":jabber.iq.time"
|
||||||
|
+ ":jabber.iq.version"
|
||||||
|
+ ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
|
||||||
+ ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
|
+ ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
|
||||||
|
+ ":org.jabber.protocol.muc_admin"
|
||||||
|
+ ":org.jabber.protocol.muc_unique"
|
||||||
|
*/
|
||||||
|
+ ":jabber.iq._private"
|
||||||
|
+ ":jabber.iq.auth"
|
||||||
|
+ ":jabber.iq.roster"
|
||||||
|
+ ":org.jabber.etherx.streams"
|
||||||
+ ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
|
+ ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
|
||||||
+ ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_admin"
|
+ ":org.jabber.protocol.muc"
|
||||||
+ ":org.jabber.protocol.muc_unique" + ":org.jabber.protocol.muc_user"
|
+ ":org.jabber.protocol.muc_user"
|
||||||
+ ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
|
+ ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
|
||||||
+ ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
|
+ ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"
|
||||||
|
);
|
||||||
|
|
||||||
inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
|
inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
|
||||||
outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
|
outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
|
||||||
|
@ -270,6 +277,7 @@ public class XmppTransport extends TcpTransport {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
|
protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
|
||||||
|
log.debug("Sending initial stream element");
|
||||||
XMLOutputFactory factory = XMLOutputFactory.newInstance();
|
XMLOutputFactory factory = XMLOutputFactory.newInstance();
|
||||||
//factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
|
//factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
|
||||||
xmlWriter = factory.createXMLStreamWriter(outputStream);
|
xmlWriter = factory.createXMLStreamWriter(outputStream);
|
||||||
|
@ -286,7 +294,23 @@ public class XmppTransport extends TcpTransport {
|
||||||
}
|
}
|
||||||
xmlWriter.writeAttribute("to", to);
|
xmlWriter.writeAttribute("to", to);
|
||||||
xmlWriter.writeAttribute("from", from);
|
xmlWriter.writeAttribute("from", from);
|
||||||
xmlWriter.writeCharacters("\n");
|
|
||||||
|
|
||||||
|
// now lets write the features
|
||||||
|
Features features = new Features();
|
||||||
|
|
||||||
|
// TODO support TLS
|
||||||
|
//features.getAny().add(new Starttls());
|
||||||
|
|
||||||
|
Mechanisms mechanisms = new Mechanisms();
|
||||||
|
|
||||||
|
// TODO support SASL
|
||||||
|
//mechanisms.getMechanism().add("DIGEST-MD5");
|
||||||
|
//mechanisms.getMechanism().add("PLAIN");
|
||||||
|
features.getAny().add(mechanisms);
|
||||||
|
marshall(features);
|
||||||
|
|
||||||
|
log.debug("Initial stream element sent!");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,40 +18,25 @@
|
||||||
package org.apache.activemq.transport.xmpp;
|
package org.apache.activemq.transport.xmpp;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
import junit.textui.TestRunner;
|
||||||
import org.jivesoftware.smack.Chat;
|
import org.jivesoftware.smack.Chat;
|
||||||
import org.jivesoftware.smack.XMPPException;
|
|
||||||
import org.jivesoftware.smack.XMPPConnection;
|
import org.jivesoftware.smack.XMPPConnection;
|
||||||
|
import org.jivesoftware.smack.XMPPException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
public class XmppTest extends TestCase {
|
public class XmppTest extends TestCase {
|
||||||
|
|
||||||
|
protected static boolean block = false;
|
||||||
|
|
||||||
private XmppBroker broker = new XmppBroker();
|
private XmppBroker broker = new XmppBroker();
|
||||||
private boolean block = false;
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
XmppTest test = new XmppTest();
|
block = true;
|
||||||
test.block = true;
|
TestRunner.run(XmppTest.class);
|
||||||
try {
|
|
||||||
test.setUp();
|
|
||||||
test.testConnect();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
System.out.println("Caught: " + e);
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
try {
|
|
||||||
test.tearDown();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
System.out.println("Caught: " + e);
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
public void testConnect() throws Exception {
|
public void testConnect() throws Exception {
|
||||||
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
|
//ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
|
||||||
//config.setDebuggerEnabled(true);
|
//config.setDebuggerEnabled(true);
|
||||||
|
@ -69,9 +54,14 @@ public class XmppTest extends TestCase {
|
||||||
System.out.println("Sent all messages!");
|
System.out.println("Sent all messages!");
|
||||||
}
|
}
|
||||||
catch (XMPPException e) {
|
catch (XMPPException e) {
|
||||||
|
if (block) {
|
||||||
System.out.println("Caught: " + e);
|
System.out.println("Caught: " + e);
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (block) {
|
if (block) {
|
||||||
Thread.sleep(20000);
|
Thread.sleep(20000);
|
||||||
System.out.println("Press any key to quit!: ");
|
System.out.println("Press any key to quit!: ");
|
||||||
|
|
Loading…
Reference in New Issue