From 746253c8effac154c8846407d0fa14f69a0db166 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Sun, 21 Feb 2010 07:20:47 +0000 Subject: [PATCH] Applied patch for https://issues.apache.org/activemq/browse/AMQ-2412 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@912310 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/xmpp/ProtocolConverter.java | 116 +++++---- .../transport/xmpp/XmppTransport.java | 60 +++-- activemq-xmpp/src/main/resources/activity.xsd | 9 +- activemq-xmpp/src/main/resources/address.xsd | 9 +- .../src/main/resources/amp-errors.xsd | 9 +- .../src/main/resources/amp-feature.xsd | 9 +- activemq-xmpp/src/main/resources/amp.xsd | 15 +- activemq-xmpp/src/main/resources/bind.xsd | 32 +++ activemq-xmpp/src/main/resources/dialback.xsd | 7 - .../src/main/resources/disco-info.xsd | 9 +- .../src/main/resources/disco-items.xsd | 9 +- activemq-xmpp/src/main/resources/iq-auth.xsd | 17 +- .../src/main/resources/iq-gateway.xsd | 9 +- activemq-xmpp/src/main/resources/iq-last.xsd | 9 +- activemq-xmpp/src/main/resources/iq-oob.xsd | 9 +- activemq-xmpp/src/main/resources/iq-pass.xsd | 9 +- .../src/main/resources/iq-private.xsd | 9 +- activemq-xmpp/src/main/resources/iq-time.xsd | 13 +- .../src/main/resources/iq-version.xsd | 9 +- .../src/main/resources/jabber-client.xsd | 21 +- .../src/main/resources/jabber-server.xsd | 10 +- .../src/main/resources/muc-admin.xsd | 9 +- .../src/main/resources/muc-unique.xsd | 7 - activemq-xmpp/src/main/resources/muc-user.xsd | 14 +- activemq-xmpp/src/main/resources/muc.xsd | 9 +- activemq-xmpp/src/main/resources/roster.xsd | 7 - .../src/main/resources/rosternotes.xsd | 9 +- activemq-xmpp/src/main/resources/rosterx.xsd | 13 +- activemq-xmpp/src/main/resources/session.xsd | 7 - .../src/main/resources/stanzaerror.xsd | 10 +- .../src/main/resources/streamerror.xsd | 10 +- activemq-xmpp/src/main/resources/streams.xsd | 198 +++++++------- .../activemq/transport/xmpp/XmppTest.java | 241 +++++++++++++++++- 33 files changed, 519 insertions(+), 414 deletions(-) create mode 100644 activemq-xmpp/src/main/resources/bind.xsd diff --git a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java index e906d93019..e8451e47ab 100644 --- a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java +++ b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/ProtocolConverter.java @@ -22,7 +22,10 @@ import java.io.StringWriter; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; @@ -96,7 +99,7 @@ public class ProtocolConverter { private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator(); - private final Map> resposeHandlers = new ConcurrentHashMap>(); + private final Map> responseHandlers = new ConcurrentHashMap>(); private final Map> subscriptionsByConsumerId = new ConcurrentHashMap>(); private final Map jidToConsumerMap = new HashMap(); private final Map jidToInboxConsumerMap = new HashMap(); @@ -106,6 +109,9 @@ public class ProtocolConverter { private final AtomicBoolean connected = new AtomicBoolean(false); private ActiveMQTempQueue inboxDestination; + //to avoid calling into sendToActiveMq from a handler + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + public ProtocolConverter(XmppTransport transport) { this.transport = transport; initialiseRegistry(); @@ -159,10 +165,10 @@ public class ProtocolConverter { } } - public void onActiveMQCommad(Command command) throws Exception { + public void onActiveMQCommand(Command command) throws Exception { if (command.isResponse()) { Response response = (Response)command; - Handler handler = resposeHandlers.remove(new Integer(response.getCorrelationId())); + Handler handler = responseHandlers.remove(new Integer(response.getCorrelationId())); if (handler != null) { handler.handle(response); } else { @@ -230,7 +236,7 @@ public class ProtocolConverter { } } - protected void onAuthQuery(Object any, final Iq iq) throws IOException { + protected void onAuthQuery(Object any, final Iq iq) throws IOException, JMSException { Query query = (Query)any; if (LOG.isDebugEnabled()) { LOG.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername()); @@ -281,10 +287,38 @@ public class ProtocolConverter { sendToActiveMQ(producerInfo, createErrorHandler("create producer")); } }); + + // create a destination for this client + final String to = query.getUsername(); + createDestination(to); + } + + public void createDestination(String to) throws IOException, JMSException { + ActiveMQDestination destination = createActiveMQDestination(to); + if (destination == null) { + LOG.debug("Unable to create destination for " + to); + return; + } + subscribe(to, destination, jidToConsumerMap); + + // lets subscribe to a personal inbox for replies + + // Check if Destination info is of temporary type. + if (inboxDestination == null) { + inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); + + DestinationInfo info = new DestinationInfo(); + info.setConnectionId(connectionInfo.getConnectionId()); + info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); + info.setDestination(inboxDestination); + sendToActiveMQ(info, null); + + subscribe(to, inboxDestination, jidToInboxConsumerMap); + } } protected String debugString(Iq iq) { - return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId(); + return "to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId(); } protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException { @@ -367,30 +401,13 @@ public class ProtocolConverter { * sendPresence(presence, item); */ - // lets create a subscription - final String to = presence.getTo(); - - ActiveMQDestination destination = createActiveMQDestination(to); - if (destination == null) { - LOG.debug("No 'to' attribute specified for presence so not creating a JMS subscription"); - return; - } - subscribe(to, destination, jidToConsumerMap); - - // lets subscribe to a personal inbox for replies - - // Check if Destination info is of temporary type. - if (inboxDestination == null) { - inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); - - DestinationInfo info = new DestinationInfo(); - info.setConnectionId(connectionInfo.getConnectionId()); - info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); - info.setDestination(inboxDestination); - sendToActiveMQ(info, null); - - subscribe(to, inboxDestination, jidToInboxConsumerMap); + // lets create a subscription for the room, Jabber clients would use + // "room/nickname", so we need to strip off the nickname + String to = presence.getTo(); + if ( to != null ) { + to = to.substring(0, to.indexOf("/")); } + createDestination(to); } protected void subscribe(final String to, ActiveMQDestination destination, Map consumerMap) { @@ -416,15 +433,23 @@ public class ProtocolConverter { consumerInfo.setDestination(destination); subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler() { - public void handle(MessageDispatch messageDispatch) throws Exception { + public void handle(final MessageDispatch messageDispatch) throws Exception { // processing the inbound message if (LOG.isDebugEnabled()) { LOG.debug("Receiving inbound: " + messageDispatch.getMessage()); } // lets send back an ACK - MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1); - sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId())); + final MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1); + + FutureTask task = new FutureTask(new Callable() { + public Void call() { + sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId())); + return null; + } + }); + + scheduledThreadPoolExecutor.submit(task); Message message = createXmppMessage(to, messageDispatch); if (message != null) { @@ -438,18 +463,26 @@ public class ProtocolConverter { sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination)); } - protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException { + protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws IOException, JMSException { + + org.apache.activemq.command.Message message = messageDispatch.getMessage(); + Message answer = new Message(); - answer.setType("groupchat"); - String from = to; - int idx = from.indexOf('/'); - if (idx > 0) { - from = from.substring(0, idx) + "/broker"; + String from = (String)message.getProperty("XMPPFrom"); + if ( from == null ) { + from = to; + int idx = from.indexOf('/'); + if (idx > 0) { + from = from.substring(0, idx) + "/broker"; + } + answer.setType("groupchat"); + } else { + answer.setType("chat"); } + LOG.debug("Sending message from " + from + " and to " + to); answer.setFrom(from); answer.setTo(to); - org.apache.activemq.command.Message message = messageDispatch.getMessage(); // answer.setType(message.getType()); if (message instanceof ActiveMQTextMessage) { ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)message; @@ -515,7 +548,7 @@ public class ProtocolConverter { command.setCommandId(generateCommandId()); if (handler != null) { command.setResponseRequired(true); - resposeHandlers.put(command.getCommandId(), handler); + responseHandlers.put(command.getCommandId(), handler); } transport.getTransportListener().onCommand(command); } @@ -578,9 +611,6 @@ public class ProtocolConverter { if (idx > 0) { name = name.substring(0, idx); } - - System.out.println("#### Creating ActiveMQ destination for: " + name); - // lets support lower-case versions of the agent topic if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) { name = AdvisorySupport.AGENT_TOPIC; @@ -613,7 +643,7 @@ public class ProtocolConverter { if (replyTo == null) { replyTo = inboxDestination; } - System.out.println("Setting reply to destination to: " + replyTo); + LOG.info("Setting reply to destination to: " + replyTo); answer.setJMSReplyTo(replyTo); } diff --git a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java index b7cb7013cd..7c98669893 100644 --- a/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java +++ b/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java @@ -64,7 +64,7 @@ public class XmppTransport extends TcpTransport { protected OutputStream outputStream; protected InputStream inputStream; - private JAXBContext context; + private static JAXBContext context; private XMLEventReader xmlReader; private Unmarshaller unmarshaller; private Marshaller marshaller; @@ -85,6 +85,7 @@ public class XmppTransport extends TcpTransport { } private void init() { + LOG.debug("Creating new instance of XmppTransport"); converter = new ProtocolConverter(this); } @@ -105,7 +106,7 @@ public class XmppTransport extends TcpTransport { } } else { try { - converter.onActiveMQCommad(command); + converter.onActiveMQCommand(command); } catch (IOException e) { throw e; } catch (Exception e) { @@ -170,6 +171,7 @@ public class XmppTransport extends TcpTransport { // unmarshal a new object Object object = unmarshaller.unmarshal(xmlReader); if (object != null) { + LOG.debug("Unmarshalled new incoming event - " + object.getClass().getName()); converter.onXmppCommand(object); } } else { @@ -218,19 +220,38 @@ public class XmppTransport extends TcpTransport { @Override protected void initializeStreams() throws Exception { // TODO it would be preferable to use class discovery here! - context = JAXBContext.newInstance("jabber.client" - /* - * + ":jabber.server" + ":jabber.iq.gateway" + ":jabber.iq.last" + - * ":jabber.iq.oob" + ":jabber.iq.pass" + ":jabber.iq.time" + - * ":jabber.iq.version" + ":org.jabber.protocol.activity" + - * ":org.jabber.protocol.address" + ":org.jabber.protocol.amp" + - * ":org.jabber.protocol.amp_errors" + ":org.jabber.protocol.muc_admin" + - * ":org.jabber.protocol.muc_unique" - */ - + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.roster" + ":org.jabber.etherx.streams" + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items" - + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_user" + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas" - + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"); - + if ( context == null ) { + context = JAXBContext.newInstance( + "jabber.server:" + + "jabber.server.dialback:" + + "jabber.client:" + + "jabber.iq._private:" + + "jabber.iq.auth:" + + "jabber.iq.gateway:" + + "jabber.iq.version:" + + "jabber.iq.roster:" + + "jabber.iq.pass:" + + "jabber.iq.last:" + + "jabber.iq.oob:" + + "jabber.iq.time:" + + "storage.rosternotes:" + + "ietf.params.xml.ns.xmpp_streams:" + + "ietf.params.xml.ns.xmpp_sasl:" + + "ietf.params.xml.ns.xmpp_stanzas:" + + "ietf.params.xml.ns.xmpp_bind:" + + "ietf.params.xml.ns.xmpp_tls:" + + "org.jabber.protocol.muc:" + + "org.jabber.protocol.rosterx:" + + "org.jabber.protocol.disco_info:" + + "org.jabber.protocol.disco_items:" + + "org.jabber.protocol.activity:" + + "org.jabber.protocol.amp_errors:" + + "org.jabber.protocol.amp:" + + "org.jabber.protocol.address:" + + "org.jabber.protocol.muc_user:" + + "org.jabber.protocol.muc_admin:" + + "org.jabber.etherx.streams"); + } inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024); outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024); @@ -241,11 +262,10 @@ public class XmppTransport extends TcpTransport { protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException { LOG.debug("Sending initial stream element"); + XMLOutputFactory factory = XMLOutputFactory.newInstance(); // factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true); xmlWriter = factory.createXMLStreamWriter(outputStream); - - // write the dummy start tag xmlWriter.writeStartDocument(); xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams"); xmlWriter.writeDefaultNamespace("jabber:client"); @@ -264,12 +284,14 @@ public class XmppTransport extends TcpTransport { // TODO support TLS // features.getAny().add(new Starttls()); - Mechanisms mechanisms = new Mechanisms(); + //Mechanisms mechanisms = new Mechanisms(); // TODO support SASL // mechanisms.getMechanism().add("DIGEST-MD5"); // mechanisms.getMechanism().add("PLAIN"); - features.getAny().add(mechanisms); + //features.getAny().add(mechanisms); + features.getAny().add(new ietf.params.xml.ns.xmpp_bind.ObjectFactory().createBind()); + features.getAny().add(new ietf.params.xml.ns.xmpp_session.ObjectFactory().createSession("")); marshall(features); LOG.debug("Initial stream element sent!"); diff --git a/activemq-xmpp/src/main/resources/activity.xsd b/activemq-xmpp/src/main/resources/activity.xsd index be791a5039..3d8cd88ebb 100644 --- a/activemq-xmpp/src/main/resources/activity.xsd +++ b/activemq-xmpp/src/main/resources/activity.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0108: http://www.jabber.org/jeps/jep-0108.html + XEP-0108: http://www.xmpp.org/extensions/xep-0108.html diff --git a/activemq-xmpp/src/main/resources/address.xsd b/activemq-xmpp/src/main/resources/address.xsd index d1da5e00a6..20c699d3f5 100644 --- a/activemq-xmpp/src/main/resources/address.xsd +++ b/activemq-xmpp/src/main/resources/address.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0033: http://www.jabber.org/jeps/jep-0033.html + XEP-0033: http://www.xmpp.org/extensions/xep-0033.html diff --git a/activemq-xmpp/src/main/resources/amp-errors.xsd b/activemq-xmpp/src/main/resources/amp-errors.xsd index f63d96a942..a680131bbc 100644 --- a/activemq-xmpp/src/main/resources/amp-errors.xsd +++ b/activemq-xmpp/src/main/resources/amp-errors.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0079: http://www.jabber.org/jeps/jep-0079.html + XEP-0079: http://www.xmpp.org/extensions/xep-0079.html diff --git a/activemq-xmpp/src/main/resources/amp-feature.xsd b/activemq-xmpp/src/main/resources/amp-feature.xsd index 93e66a5b31..7e0721fb1d 100644 --- a/activemq-xmpp/src/main/resources/amp-feature.xsd +++ b/activemq-xmpp/src/main/resources/amp-feature.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0079: http://www.jabber.org/jeps/jep-0079.html + XEP-0079: http://www.xmpp.org/extensions/xep-0079.html diff --git a/activemq-xmpp/src/main/resources/amp.xsd b/activemq-xmpp/src/main/resources/amp.xsd index c2b1a75b87..390d7f62db 100644 --- a/activemq-xmpp/src/main/resources/amp.xsd +++ b/activemq-xmpp/src/main/resources/amp.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0079: http://www.jabber.org/jeps/jep-0079.html + XEP-0079: http://www.xmpp.org/extensions/xep-0079.html @@ -25,10 +18,10 @@ - + - - + + diff --git a/activemq-xmpp/src/main/resources/bind.xsd b/activemq-xmpp/src/main/resources/bind.xsd new file mode 100644 index 0000000000..5a843bb6c7 --- /dev/null +++ b/activemq-xmpp/src/main/resources/bind.xsd @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-xmpp/src/main/resources/dialback.xsd b/activemq-xmpp/src/main/resources/dialback.xsd index 3e64f60333..3a873dfe3c 100755 --- a/activemq-xmpp/src/main/resources/dialback.xsd +++ b/activemq-xmpp/src/main/resources/dialback.xsd @@ -1,11 +1,4 @@ - - The protocol documented by this schema is defined in - JEP-0030: http://www.jabber.org/jeps/jep-0030.html + XEP-0030: http://www.xmpp.org/extensions/xep-0030.html diff --git a/activemq-xmpp/src/main/resources/disco-items.xsd b/activemq-xmpp/src/main/resources/disco-items.xsd index 715714499a..cc3ce68c51 100644 --- a/activemq-xmpp/src/main/resources/disco-items.xsd +++ b/activemq-xmpp/src/main/resources/disco-items.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0030: http://www.jabber.org/jeps/jep-0030.html + XEP-0030: http://www.xmpp.org/extensions/xep-0030.html diff --git a/activemq-xmpp/src/main/resources/iq-auth.xsd b/activemq-xmpp/src/main/resources/iq-auth.xsd index 4b22b60e2a..7db2bee617 100644 --- a/activemq-xmpp/src/main/resources/iq-auth.xsd +++ b/activemq-xmpp/src/main/resources/iq-auth.xsd @@ -1,11 +1,4 @@ - - The protocol documented by this schema is defined in - JEP-0078: http://www.jabber.org/jeps/jep-0078.html + NOTE WELL: Non-SASL Authentication via the jabber:iq:auth + protocol has been superseded by SASL Authentication as + defined in RFC 3920, and is now obsolete. + + For historical purposes, the protocol documented by this + schema is defined in XEP-0078: + + http://www.xmpp.org/extensions/xep-0078.html diff --git a/activemq-xmpp/src/main/resources/iq-gateway.xsd b/activemq-xmpp/src/main/resources/iq-gateway.xsd index c85b5ebb1c..c8fae05684 100644 --- a/activemq-xmpp/src/main/resources/iq-gateway.xsd +++ b/activemq-xmpp/src/main/resources/iq-gateway.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0100: http://www.jabber.org/jeps/jep-0100.html + XEP-0100: http://www.xmpp.org/extensions/xep-0100.html diff --git a/activemq-xmpp/src/main/resources/iq-last.xsd b/activemq-xmpp/src/main/resources/iq-last.xsd index 3785b0b47b..f7e5dee792 100644 --- a/activemq-xmpp/src/main/resources/iq-last.xsd +++ b/activemq-xmpp/src/main/resources/iq-last.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0012: http://www.jabber.org/jeps/jep-0012.html + XEP-0012: http://www.xmpp.org/extensions/xep-0012.html diff --git a/activemq-xmpp/src/main/resources/iq-oob.xsd b/activemq-xmpp/src/main/resources/iq-oob.xsd index 81b20f8421..6f2b3cafb5 100644 --- a/activemq-xmpp/src/main/resources/iq-oob.xsd +++ b/activemq-xmpp/src/main/resources/iq-oob.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0066: http://www.jabber.org/jeps/jep-0066.html + XEP-0066: http://www.xmpp.org/extensions/xep-0066.html diff --git a/activemq-xmpp/src/main/resources/iq-pass.xsd b/activemq-xmpp/src/main/resources/iq-pass.xsd index e37248e07a..0cee57115d 100644 --- a/activemq-xmpp/src/main/resources/iq-pass.xsd +++ b/activemq-xmpp/src/main/resources/iq-pass.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0003: http://www.jabber.org/jeps/jep-0003.html + XEP-0003: http://www.xmpp.org/extensions/xep-0003.html diff --git a/activemq-xmpp/src/main/resources/iq-private.xsd b/activemq-xmpp/src/main/resources/iq-private.xsd index c308ca6594..0b849d8a96 100644 --- a/activemq-xmpp/src/main/resources/iq-private.xsd +++ b/activemq-xmpp/src/main/resources/iq-private.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0049: http://www.jabber.org/jeps/jep-0049.html + XEP-0049: http://www.xmpp.org/extensions/xep-0049.html diff --git a/activemq-xmpp/src/main/resources/iq-time.xsd b/activemq-xmpp/src/main/resources/iq-time.xsd index 4eff8a4a9f..161da3d87e 100644 --- a/activemq-xmpp/src/main/resources/iq-time.xsd +++ b/activemq-xmpp/src/main/resources/iq-time.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0090: http://www.jabber.org/jeps/jep-0090.html + XEP-0090: http://www.xmpp.org/extensions/xep-0090.html + + NOTE: This protocol has been deprecated in favor of the + Entity Time protocol specified in XEP-0202: + http://www.xmpp.org/extensions/xep-0202.html diff --git a/activemq-xmpp/src/main/resources/iq-version.xsd b/activemq-xmpp/src/main/resources/iq-version.xsd index 68403a1843..239bdf4fce 100644 --- a/activemq-xmpp/src/main/resources/iq-version.xsd +++ b/activemq-xmpp/src/main/resources/iq-version.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0092: http://www.jabber.org/jeps/jep-0092.html + XEP-0092: http://www.xmpp.org/extensions/xep-0092.html diff --git a/activemq-xmpp/src/main/resources/jabber-client.xsd b/activemq-xmpp/src/main/resources/jabber-client.xsd index 6e2b4392e6..69642bca8b 100755 --- a/activemq-xmpp/src/main/resources/jabber-client.xsd +++ b/activemq-xmpp/src/main/resources/jabber-client.xsd @@ -1,11 +1,4 @@ - + + @@ -74,18 +70,7 @@ - - - - - - - - - - diff --git a/activemq-xmpp/src/main/resources/jabber-server.xsd b/activemq-xmpp/src/main/resources/jabber-server.xsd index 035bdae658..bebe343878 100755 --- a/activemq-xmpp/src/main/resources/jabber-server.xsd +++ b/activemq-xmpp/src/main/resources/jabber-server.xsd @@ -1,11 +1,4 @@ - + + diff --git a/activemq-xmpp/src/main/resources/muc-admin.xsd b/activemq-xmpp/src/main/resources/muc-admin.xsd index 32131e5af1..12e23dcb53 100644 --- a/activemq-xmpp/src/main/resources/muc-admin.xsd +++ b/activemq-xmpp/src/main/resources/muc-admin.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0045: http://www.jabber.org/jeps/jep-0045.html + XEP-0045: http://www.xmpp.org/extensions/xep-0045.html diff --git a/activemq-xmpp/src/main/resources/muc-unique.xsd b/activemq-xmpp/src/main/resources/muc-unique.xsd index 0c9396b8cf..121d7cf553 100644 --- a/activemq-xmpp/src/main/resources/muc-unique.xsd +++ b/activemq-xmpp/src/main/resources/muc-unique.xsd @@ -1,11 +1,4 @@ - - The protocol documented by this schema is defined in - JEP-0045: http://www.jabber.org/jeps/jep-0045.html + XEP-0045: http://www.xmpp.org/extensions/xep-0045.html @@ -28,7 +21,7 @@ - + @@ -110,7 +103,8 @@ - + + diff --git a/activemq-xmpp/src/main/resources/muc.xsd b/activemq-xmpp/src/main/resources/muc.xsd index 4823515ee2..7e52b8ed9f 100644 --- a/activemq-xmpp/src/main/resources/muc.xsd +++ b/activemq-xmpp/src/main/resources/muc.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0045: http://www.jabber.org/jeps/jep-0045.html + XEP-0045: http://www.xmpp.org/extensions/xep-0045.html diff --git a/activemq-xmpp/src/main/resources/roster.xsd b/activemq-xmpp/src/main/resources/roster.xsd index d448055b54..bf85745551 100755 --- a/activemq-xmpp/src/main/resources/roster.xsd +++ b/activemq-xmpp/src/main/resources/roster.xsd @@ -1,11 +1,4 @@ - - The protocol documented by this schema is defined in - JEP-0045: http://www.jabber.org/jeps/jep-0045.html + XEP-0145: http://www.xmpp.org/extensions/xep-0145.html diff --git a/activemq-xmpp/src/main/resources/rosterx.xsd b/activemq-xmpp/src/main/resources/rosterx.xsd index 8bd9e5ddcc..fe7b468d8e 100644 --- a/activemq-xmpp/src/main/resources/rosterx.xsd +++ b/activemq-xmpp/src/main/resources/rosterx.xsd @@ -1,11 +1,4 @@ - The protocol documented by this schema is defined in - JEP-0144: http://www.jabber.org/jeps/jep-0144.html + XEP-0144: http://www.xmpp.org/extensions/xep-0144.html @@ -33,9 +26,9 @@ - + - + diff --git a/activemq-xmpp/src/main/resources/session.xsd b/activemq-xmpp/src/main/resources/session.xsd index ac399ac1c3..30b147ad4d 100755 --- a/activemq-xmpp/src/main/resources/session.xsd +++ b/activemq-xmpp/src/main/resources/session.xsd @@ -1,11 +1,4 @@ - - + + diff --git a/activemq-xmpp/src/main/resources/streamerror.xsd b/activemq-xmpp/src/main/resources/streamerror.xsd index 7728364a74..47b8031e64 100755 --- a/activemq-xmpp/src/main/resources/streamerror.xsd +++ b/activemq-xmpp/src/main/resources/streamerror.xsd @@ -1,11 +1,4 @@ - + + diff --git a/activemq-xmpp/src/main/resources/streams.xsd b/activemq-xmpp/src/main/resources/streams.xsd index 5f69ace137..51ec6fa66d 100755 --- a/activemq-xmpp/src/main/resources/streams.xsd +++ b/activemq-xmpp/src/main/resources/streams.xsd @@ -1,125 +1,103 @@ - + xmlns:xs='http://www.w3.org/2001/XMLSchema' + xmlns:jaxb='http://java.sun.com/xml/ns/jaxb' + targetNamespace='http://etherx.jabber.org/streams' + xmlns='http://etherx.jabber.org/streams' + elementFormDefault='unqualified' + jaxb:version="2.0"> - - - - - - + - - - - - + - - - - - - - + - - - - - - + - - - - - - - - - - - - - - - - - - - - + + + + + + + - - - - - - - - - - - - - + + + + - - - + + + diff --git a/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java b/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java index 7cf02181ae..cdcf1bc74b 100644 --- a/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java +++ b/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java @@ -19,11 +19,17 @@ package org.apache.activemq.transport.xmpp; import junit.framework.TestCase; import junit.textui.TestRunner; import org.jivesoftware.smack.Chat; +import org.jivesoftware.smack.ChatManager; +import org.jivesoftware.smack.ChatManagerListener; import org.jivesoftware.smack.ConnectionConfiguration; import org.jivesoftware.smack.MessageListener; +import org.jivesoftware.smack.PacketListener; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.filter.PacketFilter; import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Packet; +import org.jivesoftware.smackx.muc.MultiUserChat; /** * @version $Revision$ @@ -32,7 +38,9 @@ public class XmppTest extends TestCase { protected static boolean block; - private XmppBroker broker = new XmppBroker(); + private final XmppBroker broker = new XmppBroker(); + + private final long sleepTime = 5000; public static void main(String[] args) { block = true; @@ -49,12 +57,12 @@ public class XmppTest extends TestCase { XMPPConnection con = new XMPPConnection(config); con.connect(); con.login("amq-user", "amq-pwd"); - Chat chat = con.getChatManager().createChat("test@localhost", - new MessageListener() { - public void processMessage(Chat chat, Message message) { - // - } - }); + ChatManager chatManager = con.getChatManager(); + Chat chat = chatManager.createChat("test@localhost", new MessageListener() { + public void processMessage(Chat chat, Message message) { + System.out.println("Got XMPP message from chat " + chat.getParticipant() + " message - " + message.getBody()); + } + }); for (int i = 0; i < 10; i++) { System.out.println("Sending message: " + i); chat.sendMessage("Hello from Message: " + i); @@ -77,6 +85,225 @@ public class XmppTest extends TestCase { System.out.println("Done!"); } + + + public void testChat() throws Exception { + ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222); + //config.setDebuggerEnabled(true); + + XMPPConnection consumerCon = new XMPPConnection(config); + consumerCon.connect(); + consumerCon.login("consumer", "consumer"); + consumerCon.addPacketListener(new XmppLogger("CONSUMER INBOUND"), new PacketFilter() { + public boolean accept(Packet packet) { + return true; + } + }); + consumerCon.addPacketWriterListener(new XmppLogger("CONSUMER OUTBOUND"), new PacketFilter() { + public boolean accept(Packet packet) { + return true; + } + }); + final ConsumerMessageListener listener = new ConsumerMessageListener(); + + consumerCon.getChatManager().addChatListener(new ChatManagerListener() { + public void chatCreated(Chat chat, boolean createdLocally) { + chat.addMessageListener(listener); + } + }); + + + XMPPConnection producerCon = new XMPPConnection(config); + producerCon.connect(); + producerCon.login("producer", "producer"); + producerCon.addPacketListener(new XmppLogger("PRODUCER INBOUND"), new PacketFilter() { + public boolean accept(Packet packet) { + return true; + } + }); + producerCon.addPacketWriterListener(new XmppLogger("PRODUCER OUTBOUND"), new PacketFilter() { + public boolean accept(Packet packet) { + return true; + } + }); + + Chat chat = producerCon.getChatManager().createChat("consumer", new MessageListener() { + public void processMessage(Chat chat, Message message) { + System.out.println("Got XMPP message from chat " + chat.getParticipant() + " message - " + message.getBody()); + } + }); + + for (int i = 0; i < 10; i++) { + System.out.println("Sending message: " + i); + Message message = new Message("consumer"); + message.setType(Message.Type.chat); + message.setBody("Hello from producer, message # " + i); + chat.sendMessage(message); + } + System.out.println("Sent all messages!"); + + Thread.sleep(sleepTime); + System.out.println("Consumer received - " + listener.getMessageCount()); + assertEquals(10, listener.getMessageCount()); + } + + + + public void testMultiUserChat() throws Exception { + System.out.println("\n\n\n\n\n\n"); + ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222); + //config.setDebuggerEnabled(true); + // + XMPPConnection consumerCon = new XMPPConnection(config); + consumerCon.connect(); + consumerCon.login("consumer", "consumer"); + MultiUserChat consumerMuc = new MultiUserChat(consumerCon, "muc-test"); + consumerMuc.join("consumer"); + + ConsumerMUCMessageListener listener = new ConsumerMUCMessageListener(); + consumerMuc.addMessageListener(listener); + + XMPPConnection producerCon = new XMPPConnection(config); + producerCon.connect(); + producerCon.login("producer", "producer"); + MultiUserChat producerMuc = new MultiUserChat(producerCon, "muc-test"); + producerMuc.join("producer"); + + for (int i = 0; i < 10; i++) { + System.out.println("Sending message: " + i); + Message message = producerMuc.createMessage(); + message.setBody("Hello from producer, message # " + i); + producerMuc.sendMessage(message); + } + System.out.println("Sent all messages!"); + + Thread.sleep(sleepTime); + System.out.println("Consumer received - " + listener.getMessageCount()); + assertEquals(10, listener.getMessageCount()); + } + + public void addLoggingListeners(String name, XMPPConnection connection) { + connection.addPacketListener(new XmppLogger(name + " INBOUND"), new PacketFilter() { + public boolean accept(Packet packet) { + return true; + } + }); + connection.addPacketWriterListener(new XmppLogger(name + " OUTBOUND"), new PacketFilter() { + public boolean accept(Packet packet) { + return true; + } + }); + } + + public void testTwoConnections() throws Exception { + System.out.println("\n\n\n\n\n\n"); + ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222); + //config.setDebuggerEnabled(true); + + //create the consumer first... + XMPPConnection consumerCon = new XMPPConnection(config); + consumerCon.connect(); + addLoggingListeners("CONSUMER", consumerCon); + consumerCon.login("consumer", "consumer"); + + final ConsumerMessageListener listener1 = new ConsumerMessageListener(); + consumerCon.getChatManager().addChatListener(new ChatManagerListener() { + public void chatCreated(Chat chat, boolean createdLocally) { + chat.addMessageListener(listener1); + } + }); + + //now create the producer + XMPPConnection producerCon = new XMPPConnection(config); + System.out.println("Connecting producer and consumer"); + producerCon.connect(); + addLoggingListeners("PRODUCER", producerCon); + producerCon.login("producer", "producer"); + + //create the chat and send some messages + Chat chat = producerCon.getChatManager().createChat("consumer", new MessageListener() { + public void processMessage(Chat chat, Message message) { + System.out.println("Got XMPP message from chat " + chat.getParticipant() + " message - " + message.getBody()); + } + }); + + for (int i = 0; i < 10; i++) { + System.out.println("Sending message: " + i); + Message message = new Message("consumer"); + message.setType(Message.Type.chat); + message.setBody("Hello from producer, message # " + i); + chat.sendMessage(message); + } + + //make sure the consumer has time to receive all the messages... + Thread.sleep(sleepTime); + + //create an identical 2nd consumer + XMPPConnection lastguyCon = new XMPPConnection(config); + lastguyCon.connect(); + addLoggingListeners("LASTGUY", consumerCon); + lastguyCon.login("consumer", "consumer"); + final ConsumerMessageListener listener2 = new ConsumerMessageListener(); + lastguyCon.getChatManager().addChatListener(new ChatManagerListener() { + public void chatCreated(Chat chat, boolean createdLocally) { + chat.addMessageListener(listener2); + } + }); + + for (int i = 0; i < 10; i++) { + System.out.println("Sending message: " + i); + Message message = new Message("consumer"); + message.setType(Message.Type.chat); + message.setBody("Hello from producer, message # " + i); + chat.sendMessage(message); + } + + System.out.println("Sent all messages!"); + Thread.sleep(sleepTime); + System.out.println("Consumer received - " + listener1.getMessageCount()); + assertEquals(20, listener1.getMessageCount()); + System.out.println("Consumer received - " + listener2.getMessageCount()); + assertEquals(10, listener2.getMessageCount()); + } + + class XmppLogger implements PacketListener { + + private final String direction; + + public XmppLogger(String direction) { + this.direction = direction; + } + + public void processPacket(Packet packet) { + System.out.println(direction + " : " + packet.toXML()); + } + } + + class ConsumerMUCMessageListener implements PacketListener { + private int messageCount=0; + + public void processPacket(Packet packet) { + if ( packet instanceof Message) { + System.out.println("Received message number : " + (messageCount++)); + } + } + public int getMessageCount() { + return messageCount; + } + } + + class ConsumerMessageListener implements MessageListener { + private int messageCount=0; + + public void processMessage(Chat chat, Message message) { + System.out.println("Received message number : " + (messageCount++)); + } + + public int getMessageCount() { + return messageCount; + } + } + @Override protected void setUp() throws Exception { broker.start();