diff --git a/assembly/src/release/examples/amqp/java/readme.md b/assembly/src/release/examples/amqp/java/readme.md index 6e68a4b223..fdd9a307bb 100644 --- a/assembly/src/release/examples/amqp/java/readme.md +++ b/assembly/src/release/examples/amqp/java/readme.md @@ -1,6 +1,6 @@ ## Overview -This is an example of how use the Java JMS api with ActiveMQ via the AMQP protocol. +This is an example of how to use the Java JMS api with ActiveMQ via the AMQP protocol. ## Prereqs diff --git a/assembly/src/release/examples/amqp/python/address.py b/assembly/src/release/examples/amqp/python/address.py new file mode 100644 index 0000000000..327cb01c73 --- /dev/null +++ b/assembly/src/release/examples/amqp/python/address.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +url = 'amqp://admin:admin@127.0.0.1:5672/queue://q' diff --git a/assembly/src/release/examples/amqp/python/content.py b/assembly/src/release/examples/amqp/python/content.py new file mode 100644 index 0000000000..636a5e690f --- /dev/null +++ b/assembly/src/release/examples/amqp/python/content.py @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message + +messages = [Message(subject='s%d' % i, body='b%d' % i) for i in range(10)] diff --git a/assembly/src/release/examples/amqp/python/listener.py b/assembly/src/release/examples/amqp/python/listener.py deleted file mode 100755 index dd647f3d5c..0000000000 --- a/assembly/src/release/examples/amqp/python/listener.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -""" -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import os -import sys -import time - -from proton import * - -user = os.getenv('ACTIVEMQ_USER') or 'admin' -password = os.getenv('ACTIVEMQ_PASSWORD') or 'password' -host = os.getenv('ACTIVEMQ_HOST') or '127.0.0.1' -port = int(os.getenv('ACTIVEMQ_PORT') or 5672) -destination = sys.argv[1:2] or ['topic://event'] -destination = destination[0] - -msg = Message() -mng = Messenger() -mng.password=password -mng.start() -mng.subscribe("amqp://%s@%s:%d/%s"%(user, host, port, destination)) - -count = 0 -start = time.time() -while True: - mng.recv(10) - while mng.incoming: - mng.get(msg) - if msg.body=="SHUTDOWN": - diff = time.time() - start - print 'Received %d frames in %f seconds' % (count, diff) - exit(0) - else: - if count==0: - start = time.time() - count+=1 - if count % 1000 == 0: - print 'Received %d messages.' % (count) - -mng.stop() diff --git a/assembly/src/release/examples/amqp/python/publisher.py b/assembly/src/release/examples/amqp/python/publisher.py deleted file mode 100755 index 23372ea9e3..0000000000 --- a/assembly/src/release/examples/amqp/python/publisher.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python -""" -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import os -import sys -import time - -from proton import * - -user = os.getenv('ACTIVEMQ_USER') or 'admin' -password = os.getenv('ACTIVEMQ_PASSWORD') or 'password' -host = os.getenv('ACTIVEMQ_HOST') or '127.0.0.1' -port = int(os.getenv('ACTIVEMQ_PORT') or 5672) -destination = sys.argv[1:2] or ['topic://event'] -destination = destination[0] -address = "amqp://%s@%s:%d/%s"%(user, host, port, destination) - -msg = Message() -mng = Messenger() -mng.password=password -mng.start() - -messages = 10000 - -msg.address = address -msg.body = unicode('Hello World from Python') - -count = 0 -start = time.time() -for _ in xrange(messages): - mng.put(msg) - count += 1 - if count % 1000 == 0 : - print("Sent %d messages"%(count)) - -msg.body = unicode("SHUTDOWN") -mng.put(msg) -mng.send - -diff = time.time() - start -print 'Sent %s frames in %f seconds' % (count, diff) - -mng.stop() diff --git a/assembly/src/release/examples/amqp/python/readme.md b/assembly/src/release/examples/amqp/python/readme.md new file mode 100644 index 0000000000..d1e455d8c4 --- /dev/null +++ b/assembly/src/release/examples/amqp/python/readme.md @@ -0,0 +1,21 @@ +## Overview +This is an example of how to use the python AMQP [Qpid Proton](https://qpid.apache.org/proton/index.html) reactor API with ActiveMQ. + +## Prereqs +- linux +- python 3.5+ +- you have successfully installed [python-qpid-proton](https://pypi.python.org/pypi/python-qpid-proton) - including any of its [dependencies](https://github.com/apache/qpid-proton/blob/master/INSTALL.md) +- $PYTHONPATH can search this folder + +## Running the Examples +In one terminal window run: + + python sender.py + +In another terminal window run: + + python receiver.py + +Use the ActiveMQ admin web page to check Messages Enqueued / Dequeued counts match. + +You can control which AMQP server the examples try to connect to and the messages they send by changing the values in config.py diff --git a/assembly/src/release/examples/amqp/python/receiver.py b/assembly/src/release/examples/amqp/python/receiver.py new file mode 100755 index 0000000000..da038a88b6 --- /dev/null +++ b/assembly/src/release/examples/amqp/python/receiver.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import print_function + +from proton.handlers import MessagingHandler +from proton.reactor import Container + +import address + + +class Receiver(MessagingHandler): + def __init__(self, url, messages_to_receive=10): + super(Receiver, self).__init__() + self.url = url + self._messages_to_receive = messages_to_receive + self._messages_actually_received = 0 + self._stopping = False + + def on_start(self, event): + event.container.create_receiver(self.url) + + def on_message(self, event): + if self._stopping: + return + + print(event.message) + self._messages_actually_received += 1 + if self._messages_actually_received == self._messages_to_receive: + event.connection.close() + self._stopping = True + + def on_transport_error(self, event): + raise Exception(event.transport.condition) + + +if __name__ == "__main__": + try: + Container(Receiver(address.url)).run() + except KeyboardInterrupt: + pass diff --git a/assembly/src/release/examples/amqp/python/sender.py b/assembly/src/release/examples/amqp/python/sender.py new file mode 100755 index 0000000000..7883921885 --- /dev/null +++ b/assembly/src/release/examples/amqp/python/sender.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import print_function + +from proton.handlers import MessagingHandler +from proton.reactor import Container + +import address +import content + + +class Sender(MessagingHandler): + def __init__(self, url, messages): + super(Sender, self).__init__() + self.url = url + self._messages = messages + self._message_index = 0 + self._sent_count = 0 + self._confirmed_count = 0 + + def on_start(self, event): + event.container.create_sender(self.url) + + def on_sendable(self, event): + while event.sender.credit and self._sent_count < len(self._messages): + message = self._messages[self._message_index] + print(message) + event.sender.send(message) + self._message_index += 1 + self._sent_count += 1 + + def on_accepted(self, event): + self._confirmed_count += 1 + if self._confirmed_count == len(self._messages): + event.connection.close() + + def on_transport_error(self, event): + raise Exception(event.transport.condition) + + +if __name__ == "__main__": + try: + Container(Sender(address.url, content.messages)).run() + except KeyboardInterrupt: + pass