git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@912310 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-02-21 07:20:47 +00:00
parent baf9301a52
commit 746253c8ef
33 changed files with 519 additions and 414 deletions

View File

@ -22,7 +22,10 @@ import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@ -96,7 +99,7 @@ public class ProtocolConverter {
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator();
private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
private final Map<Integer, Handler<Response>> responseHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String, ConsumerInfo>();
@ -106,6 +109,9 @@ public class ProtocolConverter {
private final AtomicBoolean connected = new AtomicBoolean(false);
private ActiveMQTempQueue inboxDestination;
//to avoid calling into sendToActiveMq from a handler
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
public ProtocolConverter(XmppTransport transport) {
this.transport = transport;
initialiseRegistry();
@ -159,10 +165,10 @@ public class ProtocolConverter {
}
}
public void onActiveMQCommad(Command command) throws Exception {
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response)command;
Handler<Response> handler = resposeHandlers.remove(new Integer(response.getCorrelationId()));
Handler<Response> handler = responseHandlers.remove(new Integer(response.getCorrelationId()));
if (handler != null) {
handler.handle(response);
} else {
@ -230,7 +236,7 @@ public class ProtocolConverter {
}
}
protected void onAuthQuery(Object any, final Iq iq) throws IOException {
protected void onAuthQuery(Object any, final Iq iq) throws IOException, JMSException {
Query query = (Query)any;
if (LOG.isDebugEnabled()) {
LOG.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
@ -281,10 +287,38 @@ public class ProtocolConverter {
sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
}
});
// create a destination for this client
final String to = query.getUsername();
createDestination(to);
}
public void createDestination(String to) throws IOException, JMSException {
ActiveMQDestination destination = createActiveMQDestination(to);
if (destination == null) {
LOG.debug("Unable to create destination for " + to);
return;
}
subscribe(to, destination, jidToConsumerMap);
// lets subscribe to a personal inbox for replies
// Check if Destination info is of temporary type.
if (inboxDestination == null) {
inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionInfo.getConnectionId());
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(inboxDestination);
sendToActiveMQ(info, null);
subscribe(to, inboxDestination, jidToInboxConsumerMap);
}
}
protected String debugString(Iq iq) {
return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
return "to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
}
protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException {
@ -367,30 +401,13 @@ public class ProtocolConverter {
* sendPresence(presence, item);
*/
// lets create a subscription
final String to = presence.getTo();
ActiveMQDestination destination = createActiveMQDestination(to);
if (destination == null) {
LOG.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
return;
}
subscribe(to, destination, jidToConsumerMap);
// lets subscribe to a personal inbox for replies
// Check if Destination info is of temporary type.
if (inboxDestination == null) {
inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionInfo.getConnectionId());
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(inboxDestination);
sendToActiveMQ(info, null);
subscribe(to, inboxDestination, jidToInboxConsumerMap);
// lets create a subscription for the room, Jabber clients would use
// "room/nickname", so we need to strip off the nickname
String to = presence.getTo();
if ( to != null ) {
to = to.substring(0, to.indexOf("/"));
}
createDestination(to);
}
protected void subscribe(final String to, ActiveMQDestination destination, Map<String, ConsumerInfo> consumerMap) {
@ -416,15 +433,23 @@ public class ProtocolConverter {
consumerInfo.setDestination(destination);
subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
public void handle(MessageDispatch messageDispatch) throws Exception {
public void handle(final MessageDispatch messageDispatch) throws Exception {
// processing the inbound message
if (LOG.isDebugEnabled()) {
LOG.debug("Receiving inbound: " + messageDispatch.getMessage());
}
// lets send back an ACK
MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
final MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
public Void call() {
sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
return null;
}
});
scheduledThreadPoolExecutor.submit(task);
Message message = createXmppMessage(to, messageDispatch);
if (message != null) {
@ -438,18 +463,26 @@ public class ProtocolConverter {
sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
}
protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException {
protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws IOException, JMSException {
org.apache.activemq.command.Message message = messageDispatch.getMessage();
Message answer = new Message();
answer.setType("groupchat");
String from = to;
int idx = from.indexOf('/');
if (idx > 0) {
from = from.substring(0, idx) + "/broker";
String from = (String)message.getProperty("XMPPFrom");
if ( from == null ) {
from = to;
int idx = from.indexOf('/');
if (idx > 0) {
from = from.substring(0, idx) + "/broker";
}
answer.setType("groupchat");
} else {
answer.setType("chat");
}
LOG.debug("Sending message from " + from + " and to " + to);
answer.setFrom(from);
answer.setTo(to);
org.apache.activemq.command.Message message = messageDispatch.getMessage();
// answer.setType(message.getType());
if (message instanceof ActiveMQTextMessage) {
ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)message;
@ -515,7 +548,7 @@ public class ProtocolConverter {
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
resposeHandlers.put(command.getCommandId(), handler);
responseHandlers.put(command.getCommandId(), handler);
}
transport.getTransportListener().onCommand(command);
}
@ -578,9 +611,6 @@ public class ProtocolConverter {
if (idx > 0) {
name = name.substring(0, idx);
}
System.out.println("#### Creating ActiveMQ destination for: " + name);
// lets support lower-case versions of the agent topic
if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) {
name = AdvisorySupport.AGENT_TOPIC;
@ -613,7 +643,7 @@ public class ProtocolConverter {
if (replyTo == null) {
replyTo = inboxDestination;
}
System.out.println("Setting reply to destination to: " + replyTo);
LOG.info("Setting reply to destination to: " + replyTo);
answer.setJMSReplyTo(replyTo);
}

View File

@ -64,7 +64,7 @@ public class XmppTransport extends TcpTransport {
protected OutputStream outputStream;
protected InputStream inputStream;
private JAXBContext context;
private static JAXBContext context;
private XMLEventReader xmlReader;
private Unmarshaller unmarshaller;
private Marshaller marshaller;
@ -85,6 +85,7 @@ public class XmppTransport extends TcpTransport {
}
private void init() {
LOG.debug("Creating new instance of XmppTransport");
converter = new ProtocolConverter(this);
}
@ -105,7 +106,7 @@ public class XmppTransport extends TcpTransport {
}
} else {
try {
converter.onActiveMQCommad(command);
converter.onActiveMQCommand(command);
} catch (IOException e) {
throw e;
} catch (Exception e) {
@ -170,6 +171,7 @@ public class XmppTransport extends TcpTransport {
// unmarshal a new object
Object object = unmarshaller.unmarshal(xmlReader);
if (object != null) {
LOG.debug("Unmarshalled new incoming event - " + object.getClass().getName());
converter.onXmppCommand(object);
}
} else {
@ -218,19 +220,38 @@ public class XmppTransport extends TcpTransport {
@Override
protected void initializeStreams() throws Exception {
// TODO it would be preferable to use class discovery here!
context = JAXBContext.newInstance("jabber.client"
/*
* + ":jabber.server" + ":jabber.iq.gateway" + ":jabber.iq.last" +
* ":jabber.iq.oob" + ":jabber.iq.pass" + ":jabber.iq.time" +
* ":jabber.iq.version" + ":org.jabber.protocol.activity" +
* ":org.jabber.protocol.address" + ":org.jabber.protocol.amp" +
* ":org.jabber.protocol.amp_errors" + ":org.jabber.protocol.muc_admin" +
* ":org.jabber.protocol.muc_unique"
*/
+ ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.roster" + ":org.jabber.etherx.streams" + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
+ ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_user" + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
+ ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
if ( context == null ) {
context = JAXBContext.newInstance(
"jabber.server:" +
"jabber.server.dialback:" +
"jabber.client:" +
"jabber.iq._private:" +
"jabber.iq.auth:" +
"jabber.iq.gateway:" +
"jabber.iq.version:" +
"jabber.iq.roster:" +
"jabber.iq.pass:" +
"jabber.iq.last:" +
"jabber.iq.oob:" +
"jabber.iq.time:" +
"storage.rosternotes:" +
"ietf.params.xml.ns.xmpp_streams:" +
"ietf.params.xml.ns.xmpp_sasl:" +
"ietf.params.xml.ns.xmpp_stanzas:" +
"ietf.params.xml.ns.xmpp_bind:" +
"ietf.params.xml.ns.xmpp_tls:" +
"org.jabber.protocol.muc:" +
"org.jabber.protocol.rosterx:" +
"org.jabber.protocol.disco_info:" +
"org.jabber.protocol.disco_items:" +
"org.jabber.protocol.activity:" +
"org.jabber.protocol.amp_errors:" +
"org.jabber.protocol.amp:" +
"org.jabber.protocol.address:" +
"org.jabber.protocol.muc_user:" +
"org.jabber.protocol.muc_admin:" +
"org.jabber.etherx.streams");
}
inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
@ -241,11 +262,10 @@ public class XmppTransport extends TcpTransport {
protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
LOG.debug("Sending initial stream element");
XMLOutputFactory factory = XMLOutputFactory.newInstance();
// factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
xmlWriter = factory.createXMLStreamWriter(outputStream);
// write the dummy start tag
xmlWriter.writeStartDocument();
xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams");
xmlWriter.writeDefaultNamespace("jabber:client");
@ -264,12 +284,14 @@ public class XmppTransport extends TcpTransport {
// TODO support TLS
// features.getAny().add(new Starttls());
Mechanisms mechanisms = new Mechanisms();
//Mechanisms mechanisms = new Mechanisms();
// TODO support SASL
// mechanisms.getMechanism().add("DIGEST-MD5");
// mechanisms.getMechanism().add("PLAIN");
features.getAny().add(mechanisms);
//features.getAny().add(mechanisms);
features.getAny().add(new ietf.params.xml.ns.xmpp_bind.ObjectFactory().createBind());
features.getAny().add(new ietf.params.xml.ns.xmpp_session.ObjectFactory().createSession(""));
marshall(features);
LOG.debug("Initial stream element sent!");

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0108: http://www.jabber.org/jeps/jep-0108.html
XEP-0108: http://www.xmpp.org/extensions/xep-0108.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0033: http://www.jabber.org/jeps/jep-0033.html
XEP-0033: http://www.xmpp.org/extensions/xep-0033.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0079: http://www.jabber.org/jeps/jep-0079.html
XEP-0079: http://www.xmpp.org/extensions/xep-0079.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0079: http://www.jabber.org/jeps/jep-0079.html
XEP-0079: http://www.xmpp.org/extensions/xep-0079.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0079: http://www.jabber.org/jeps/jep-0079.html
XEP-0079: http://www.xmpp.org/extensions/xep-0079.html
</xs:documentation>
</xs:annotation>
@ -25,10 +18,10 @@
<xs:sequence>
<xs:element ref='rule' minOccurs='1' maxOccurs='unbounded'/>
</xs:sequence>
<xs:attribute name='from' usage='optional' type='xs:string'/>
<xs:attribute name='from' use='optional' type='xs:string'/>
<xs:attribute name='per-hop' use='optional' type='xs:boolean' default='false'/>
<xs:attribute name='status' usage='optional' type='xs:NCName'/>
<xs:attribute name='to' usage='optional' type='xs:string'/>
<xs:attribute name='status' use='optional' type='xs:NCName'/>
<xs:attribute name='to' use='optional' type='xs:string'/>
</xs:complexType>
</xs:element>

View File

@ -0,0 +1,32 @@
<?xml version='1.0' encoding='UTF-8'?>
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
targetNamespace='urn:ietf:params:xml:ns:xmpp-bind'
xmlns='urn:ietf:params:xml:ns:xmpp-bind'
elementFormDefault='qualified'>
<xs:element name='bind'>
<xs:complexType>
<xs:choice minOccurs='0' maxOccurs='1'>
<xs:element name='resource' type='resourceType'/>
<xs:element name='jid' type='fullJIDType'/>
</xs:choice>
</xs:complexType>
</xs:element>
<xs:simpleType name='resourceType'>
<xs:restriction base='xs:string'>
<xs:minLength value='1'/>
<xs:maxLength value='1023'/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name='fullJIDType'>
<xs:restriction base='xs:string'>
<xs:minLength value='8'/>
<xs:maxLength value='3071'/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8' ?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0030: http://www.jabber.org/jeps/jep-0030.html
XEP-0030: http://www.xmpp.org/extensions/xep-0030.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8' ?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0030: http://www.jabber.org/jeps/jep-0030.html
XEP-0030: http://www.xmpp.org/extensions/xep-0030.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -15,8 +8,14 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0078: http://www.jabber.org/jeps/jep-0078.html
NOTE WELL: Non-SASL Authentication via the jabber:iq:auth
protocol has been superseded by SASL Authentication as
defined in RFC 3920, and is now obsolete.
For historical purposes, the protocol documented by this
schema is defined in XEP-0078:
http://www.xmpp.org/extensions/xep-0078.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0100: http://www.jabber.org/jeps/jep-0100.html
XEP-0100: http://www.xmpp.org/extensions/xep-0100.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0012: http://www.jabber.org/jeps/jep-0012.html
XEP-0012: http://www.xmpp.org/extensions/xep-0012.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0066: http://www.jabber.org/jeps/jep-0066.html
XEP-0066: http://www.xmpp.org/extensions/xep-0066.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0003: http://www.jabber.org/jeps/jep-0003.html
XEP-0003: http://www.xmpp.org/extensions/xep-0003.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0049: http://www.jabber.org/jeps/jep-0049.html
XEP-0049: http://www.xmpp.org/extensions/xep-0049.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,11 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0090: http://www.jabber.org/jeps/jep-0090.html
XEP-0090: http://www.xmpp.org/extensions/xep-0090.html
NOTE: This protocol has been deprecated in favor of the
Entity Time protocol specified in XEP-0202:
http://www.xmpp.org/extensions/xep-0202.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0092: http://www.jabber.org/jeps/jep-0092.html
XEP-0092: http://www.xmpp.org/extensions/xep-0092.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,6 +9,9 @@
<xs:import namespace='urn:ietf:params:xml:ns:xmpp-stanzas'
schemaLocation='stanzaerror.xsd'/>
<xs:import namespace='http://www.w3.org/XML/1998/namespace'
schemaLocation='xml.xsd'/>
<xs:element name='message'>
<xs:complexType>
<xs:sequence>
@ -74,18 +70,7 @@
</xs:complexType>
</xs:element>
<xs:element name='thread'>
<xs:complexType>
<xs:simpleContent>
<xs:extension base='xs:NMTOKEN'/>
</xs:simpleContent>
</xs:complexType>
</xs:element>
<!--
<xs:element name='thread' type='xs:NMTOKEN'/>
-->
<xs:element name='presence'>
<xs:complexType>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,6 +9,9 @@
<xs:import namespace='urn:ietf:params:xml:ns:xmpp-stanzas'
schemaLocation='stanzaerror.xsd'/>
<xs:import namespace='http://www.w3.org/XML/1998/namespace'
schemaLocation='xml.xsd'/>
<xs:element name='message'>
<xs:complexType>
<xs:sequence>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0045: http://www.jabber.org/jeps/jep-0045.html
XEP-0045: http://www.xmpp.org/extensions/xep-0045.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0045: http://www.jabber.org/jeps/jep-0045.html
XEP-0045: http://www.xmpp.org/extensions/xep-0045.html
</xs:documentation>
</xs:annotation>
@ -28,7 +21,7 @@
<xs:element ref='invite' minOccurs='0' maxOccurs='unbounded'/>
<xs:element ref='item' minOccurs='0'/>
<xs:element name='password' type='xs:string' minOccurs='0'/>
<xs:element ref='status' minOccurs='0'/>
<xs:element ref='status' minOccurs='0' maxOccurs='unbounded'/>
</xs:choice>
</xs:complexType>
</xs:element>
@ -110,7 +103,8 @@
<xs:attribute name='code' use='required'>
<xs:simpleType>
<xs:restriction base='xs:int'>
<xs:length value='3'/>
<xs:minInclusive value='100'/>
<xs:maxInclusive value='999'/>
</xs:restriction>
</xs:simpleType>
</xs:attribute>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0045: http://www.jabber.org/jeps/jep-0045.html
XEP-0045: http://www.xmpp.org/extensions/xep-0045.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0045: http://www.jabber.org/jeps/jep-0045.html
XEP-0145: http://www.xmpp.org/extensions/xep-0145.html
</xs:documentation>
</xs:annotation>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -16,7 +9,7 @@
<xs:annotation>
<xs:documentation>
The protocol documented by this schema is defined in
JEP-0144: http://www.jabber.org/jeps/jep-0144.html
XEP-0144: http://www.xmpp.org/extensions/xep-0144.html
</xs:documentation>
</xs:annotation>
@ -33,9 +26,9 @@
<xs:sequence>
<xs:element name='group' type='xs:string' minOccurs='0' maxOccurs='unbounded'/>
</xs:sequence>
<xs:attribute name='action' use='optional'>
<xs:attribute name='action' use='optional' default='add'>
<xs:simpleType>
<xs:restriction base='xs:NCName' default='add'>
<xs:restriction base='xs:NCName'>
<xs:enumeration value='add'/>
<xs:enumeration value='delete'/>
<xs:enumeration value='modify'/>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -13,6 +6,9 @@
xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'
elementFormDefault='qualified'>
<xs:import namespace='http://www.w3.org/XML/1998/namespace'
schemaLocation='xml.xsd'/>
<xs:element name='bad-request' type='empty'/>
<xs:element name='conflict' type='empty'/>
<xs:element name='feature-not-implemented' type='empty'/>

View File

@ -1,11 +1,4 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
@ -13,6 +6,9 @@
xmlns='urn:ietf:params:xml:ns:xmpp-streams'
elementFormDefault='qualified'>
<xs:import namespace='http://www.w3.org/XML/1998/namespace'
schemaLocation='xml.xsd'/>
<xs:element name='bad-format' type='empty'/>
<xs:element name='bad-namespace-prefix' type='empty'/>
<xs:element name='conflict' type='empty'/>

View File

@ -1,125 +1,103 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
This XSD is licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
-->
<xs:schema
xmlns:xs='http://www.w3.org/2001/XMLSchema'
xmlns:jaxb="http://java.sun.com/xml/ns/jaxb"
targetNamespace='http://etherx.jabber.org/streams'
xmlns='http://etherx.jabber.org/streams'
elementFormDefault='unqualified'
jaxb:version="2.0">
xmlns:xs='http://www.w3.org/2001/XMLSchema'
xmlns:jaxb='http://java.sun.com/xml/ns/jaxb'
targetNamespace='http://etherx.jabber.org/streams'
xmlns='http://etherx.jabber.org/streams'
elementFormDefault='unqualified'
jaxb:version="2.0">
<xs:import namespace='jabber:client'
schemaLocation='jabber-client.xsd'/>
<xs:import namespace='jabber:server'
schemaLocation='jabber-server.xsd'/>
<xs:import namespace='jabber:server:dialback'
schemaLocation='dialback.xsd'/>
<xs:import namespace='urn:ietf:params:xml:ns:xmpp-streams'
schemaLocation='streamerror.xsd'/>
<xs:import namespace='urn:ietf:params:xml:ns:xmpp-tls'
schemaLocation='tls.xsd'/>
<xs:import namespace='urn:ietf:params:xml:ns:xmpp-sasl'
schemaLocation='sasl.xsd'/>
<xs:import namespace='jabber:client'
schemaLocation='jabber-client.xsd'/>
<xs:element name='stream'>
<xs:complexType>
<xs:sequence xmlns:client='jabber:client'
xmlns:server='jabber:server'
xmlns:db='jabber:server:dialback'>
<xs:element ref='features' minOccurs='0' maxOccurs='1'/>
<xs:any jaxb:property="" namespace='urn:ietf:params:xml:ns:xmpp-tls'
minOccurs='0'
maxOccurs='unbounded'>
<xs:import namespace='jabber:server'
schemaLocation='jabber-server.xsd'/>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="tls"/>
</xs:appinfo>
</xs:annotation>
</xs:any>
<xs:any namespace='urn:ietf:params:xml:ns:xmpp-sasl'
minOccurs='0'
maxOccurs='unbounded'>
<xs:import namespace='jabber:server:dialback'
schemaLocation='dialback.xsd'/>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="sasl"/>
</xs:appinfo>
</xs:annotation>
</xs:any>
<xs:import namespace='urn:ietf:params:xml:ns:xmpp-streams'
schemaLocation='streamerror.xsd'/>
<!--
<xs:choice minOccurs='0' maxOccurs='1'>
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:element ref='client:message'/>
<xs:element ref='client:presence'/>
<xs:element ref='client:iq'/>
<xs:import namespace='http://www.w3.org/XML/1998/namespace'
schemaLocation='xml.xsd'/>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="clientMessages"/>
</xs:appinfo>
</xs:annotation>
</xs:choice>
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:element ref='server:message'/>
<xs:element ref='server:presence'/>
<xs:element ref='server:iq'/>
<xs:element ref='db:result'/>
<xs:element ref='db:verify'/>
<xs:element name='stream'>
<xs:complexType>
<xs:sequence xmlns:client='jabber:client'
xmlns:server='jabber:server'
xmlns:db='jabber:server:dialback'>
<xs:element ref='features' minOccurs='0' maxOccurs='1'/>
<xs:any namespace='urn:ietf:params:xml:ns:xmpp-tls'
minOccurs='0'
maxOccurs='unbounded'>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="xmppTls"/>
</xs:appinfo>
</xs:annotation>
</xs:any>
<xs:any namespace='urn:ietf:params:xml:ns:xmpp-sasl'
minOccurs='0'
maxOccurs='unbounded'>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="xmppSasl"/>
</xs:appinfo>
</xs:annotation>
</xs:any>
<xs:choice minOccurs='0' maxOccurs='1'>
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="clientMessages"/>
</xs:appinfo>
</xs:annotation>
<xs:element ref='client:message'/>
<xs:element ref='client:presence'/>
<xs:element ref='client:iq'/>
</xs:choice>
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="serverMessages"/>
</xs:appinfo>
</xs:annotation>
<xs:element ref='server:message'/>
<xs:element ref='server:presence'/>
<xs:element ref='server:iq'/>
<xs:element ref='db:result'/>
<xs:element ref='db:verify'/>
</xs:choice>
</xs:choice>
<xs:element ref='error' minOccurs='0' maxOccurs='1'/>
</xs:sequence>
<xs:attribute name='from' type='xs:string' use='optional'/>
<xs:attribute name='id' type='xs:NMTOKEN' use='optional'/>
<xs:attribute name='to' type='xs:string' use='optional'/>
<xs:attribute name='version' type='xs:decimal' use='optional'/>
<xs:attribute ref='xml:lang' use='optional'/>
</xs:complexType>
</xs:element>
<xs:annotation>
<xs:appinfo>
<jaxb:property name="serverMessages"/>
</xs:appinfo>
</xs:annotation>
</xs:choice>
</xs:choice>
-->
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:element ref='client:message'/>
<xs:element ref='client:presence'/>
<xs:element ref='client:iq'/>
<xs:element ref='server:message'/>
<xs:element ref='server:presence'/>
<xs:element ref='server:iq'/>
<xs:element ref='db:result'/>
<xs:element ref='db:verify'/>
</xs:choice>
<xs:element ref='error' minOccurs='0' maxOccurs='1'/>
</xs:sequence>
<xs:attribute name='from' type='xs:string' use='optional'/>
<xs:attribute name='id' type='xs:NMTOKEN' use='optional'/>
<xs:attribute name='to' type='xs:string' use='optional'/>
<xs:attribute name='version' type='xs:decimal' use='optional'/>
<xs:attribute ref='xml:lang' use='optional'/>
</xs:complexType>
</xs:element>
<xs:element name='features'>
<xs:complexType>
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:any namespace='##other'/>
</xs:choice>
</xs:complexType>
</xs:element>
<xs:element name='features'>
<xs:complexType>
<xs:choice minOccurs='0' maxOccurs='unbounded'>
<xs:any namespace='##other'/>
</xs:choice>
</xs:complexType>
</xs:element>
<xs:element name='error'>
<xs:complexType>
<xs:sequence xmlns:err='urn:ietf:params:xml:ns:xmpp-streams'>
<xs:group ref='err:streamErrorGroup'/>
<xs:element ref='err:text'
<xs:element name='error'>
<xs:complexType>
<xs:sequence xmlns:err='urn:ietf:params:xml:ns:xmpp-streams'>
<xs:group ref='err:streamErrorGroup'/>
<xs:element ref='err:text'
minOccurs='0'
maxOccurs='1'/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>

View File

@ -19,11 +19,17 @@ package org.apache.activemq.transport.xmpp;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.jivesoftware.smack.Chat;
import org.jivesoftware.smack.ChatManager;
import org.jivesoftware.smack.ChatManagerListener;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.MessageListener;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smackx.muc.MultiUserChat;
/**
* @version $Revision$
@ -32,7 +38,9 @@ public class XmppTest extends TestCase {
protected static boolean block;
private XmppBroker broker = new XmppBroker();
private final XmppBroker broker = new XmppBroker();
private final long sleepTime = 5000;
public static void main(String[] args) {
block = true;
@ -49,12 +57,12 @@ public class XmppTest extends TestCase {
XMPPConnection con = new XMPPConnection(config);
con.connect();
con.login("amq-user", "amq-pwd");
Chat chat = con.getChatManager().createChat("test@localhost",
new MessageListener() {
public void processMessage(Chat chat, Message message) {
//
}
});
ChatManager chatManager = con.getChatManager();
Chat chat = chatManager.createChat("test@localhost", new MessageListener() {
public void processMessage(Chat chat, Message message) {
System.out.println("Got XMPP message from chat " + chat.getParticipant() + " message - " + message.getBody());
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Sending message: " + i);
chat.sendMessage("Hello from Message: " + i);
@ -77,6 +85,225 @@ public class XmppTest extends TestCase {
System.out.println("Done!");
}
public void testChat() throws Exception {
ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
//config.setDebuggerEnabled(true);
XMPPConnection consumerCon = new XMPPConnection(config);
consumerCon.connect();
consumerCon.login("consumer", "consumer");
consumerCon.addPacketListener(new XmppLogger("CONSUMER INBOUND"), new PacketFilter() {
public boolean accept(Packet packet) {
return true;
}
});
consumerCon.addPacketWriterListener(new XmppLogger("CONSUMER OUTBOUND"), new PacketFilter() {
public boolean accept(Packet packet) {
return true;
}
});
final ConsumerMessageListener listener = new ConsumerMessageListener();
consumerCon.getChatManager().addChatListener(new ChatManagerListener() {
public void chatCreated(Chat chat, boolean createdLocally) {
chat.addMessageListener(listener);
}
});
XMPPConnection producerCon = new XMPPConnection(config);
producerCon.connect();
producerCon.login("producer", "producer");
producerCon.addPacketListener(new XmppLogger("PRODUCER INBOUND"), new PacketFilter() {
public boolean accept(Packet packet) {
return true;
}
});
producerCon.addPacketWriterListener(new XmppLogger("PRODUCER OUTBOUND"), new PacketFilter() {
public boolean accept(Packet packet) {
return true;
}
});
Chat chat = producerCon.getChatManager().createChat("consumer", new MessageListener() {
public void processMessage(Chat chat, Message message) {
System.out.println("Got XMPP message from chat " + chat.getParticipant() + " message - " + message.getBody());
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Sending message: " + i);
Message message = new Message("consumer");
message.setType(Message.Type.chat);
message.setBody("Hello from producer, message # " + i);
chat.sendMessage(message);
}
System.out.println("Sent all messages!");
Thread.sleep(sleepTime);
System.out.println("Consumer received - " + listener.getMessageCount());
assertEquals(10, listener.getMessageCount());
}
public void testMultiUserChat() throws Exception {
System.out.println("\n\n\n\n\n\n");
ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
//config.setDebuggerEnabled(true);
//
XMPPConnection consumerCon = new XMPPConnection(config);
consumerCon.connect();
consumerCon.login("consumer", "consumer");
MultiUserChat consumerMuc = new MultiUserChat(consumerCon, "muc-test");
consumerMuc.join("consumer");
ConsumerMUCMessageListener listener = new ConsumerMUCMessageListener();
consumerMuc.addMessageListener(listener);
XMPPConnection producerCon = new XMPPConnection(config);
producerCon.connect();
producerCon.login("producer", "producer");
MultiUserChat producerMuc = new MultiUserChat(producerCon, "muc-test");
producerMuc.join("producer");
for (int i = 0; i < 10; i++) {
System.out.println("Sending message: " + i);
Message message = producerMuc.createMessage();
message.setBody("Hello from producer, message # " + i);
producerMuc.sendMessage(message);
}
System.out.println("Sent all messages!");
Thread.sleep(sleepTime);
System.out.println("Consumer received - " + listener.getMessageCount());
assertEquals(10, listener.getMessageCount());
}
public void addLoggingListeners(String name, XMPPConnection connection) {
connection.addPacketListener(new XmppLogger(name + " INBOUND"), new PacketFilter() {
public boolean accept(Packet packet) {
return true;
}
});
connection.addPacketWriterListener(new XmppLogger(name + " OUTBOUND"), new PacketFilter() {
public boolean accept(Packet packet) {
return true;
}
});
}
public void testTwoConnections() throws Exception {
System.out.println("\n\n\n\n\n\n");
ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
//config.setDebuggerEnabled(true);
//create the consumer first...
XMPPConnection consumerCon = new XMPPConnection(config);
consumerCon.connect();
addLoggingListeners("CONSUMER", consumerCon);
consumerCon.login("consumer", "consumer");
final ConsumerMessageListener listener1 = new ConsumerMessageListener();
consumerCon.getChatManager().addChatListener(new ChatManagerListener() {
public void chatCreated(Chat chat, boolean createdLocally) {
chat.addMessageListener(listener1);
}
});
//now create the producer
XMPPConnection producerCon = new XMPPConnection(config);
System.out.println("Connecting producer and consumer");
producerCon.connect();
addLoggingListeners("PRODUCER", producerCon);
producerCon.login("producer", "producer");
//create the chat and send some messages
Chat chat = producerCon.getChatManager().createChat("consumer", new MessageListener() {
public void processMessage(Chat chat, Message message) {
System.out.println("Got XMPP message from chat " + chat.getParticipant() + " message - " + message.getBody());
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Sending message: " + i);
Message message = new Message("consumer");
message.setType(Message.Type.chat);
message.setBody("Hello from producer, message # " + i);
chat.sendMessage(message);
}
//make sure the consumer has time to receive all the messages...
Thread.sleep(sleepTime);
//create an identical 2nd consumer
XMPPConnection lastguyCon = new XMPPConnection(config);
lastguyCon.connect();
addLoggingListeners("LASTGUY", consumerCon);
lastguyCon.login("consumer", "consumer");
final ConsumerMessageListener listener2 = new ConsumerMessageListener();
lastguyCon.getChatManager().addChatListener(new ChatManagerListener() {
public void chatCreated(Chat chat, boolean createdLocally) {
chat.addMessageListener(listener2);
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Sending message: " + i);
Message message = new Message("consumer");
message.setType(Message.Type.chat);
message.setBody("Hello from producer, message # " + i);
chat.sendMessage(message);
}
System.out.println("Sent all messages!");
Thread.sleep(sleepTime);
System.out.println("Consumer received - " + listener1.getMessageCount());
assertEquals(20, listener1.getMessageCount());
System.out.println("Consumer received - " + listener2.getMessageCount());
assertEquals(10, listener2.getMessageCount());
}
class XmppLogger implements PacketListener {
private final String direction;
public XmppLogger(String direction) {
this.direction = direction;
}
public void processPacket(Packet packet) {
System.out.println(direction + " : " + packet.toXML());
}
}
class ConsumerMUCMessageListener implements PacketListener {
private int messageCount=0;
public void processPacket(Packet packet) {
if ( packet instanceof Message) {
System.out.println("Received message number : " + (messageCount++));
}
}
public int getMessageCount() {
return messageCount;
}
}
class ConsumerMessageListener implements MessageListener {
private int messageCount=0;
public void processMessage(Chat chat, Message message) {
System.out.println("Received message number : " + (messageCount++));
}
public int getMessageCount() {
return messageCount;
}
}
@Override
protected void setUp() throws Exception {
broker.start();