AMQ-6828 Improvement to python AMQP example.

Migrate example python code to proton reactor API.
This commit is contained in:
James Sears 2017-10-06 15:10:55 +01:00
parent b3f41cb44b
commit cc6cb74c74
8 changed files with 183 additions and 112 deletions

View File

@ -1,6 +1,6 @@
## Overview
This is an example of how use the Java JMS api with ActiveMQ via the AMQP protocol.
This is an example of how to use the Java JMS api with ActiveMQ via the AMQP protocol.
## Prereqs

View File

@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
url = 'amqp://admin:admin@127.0.0.1:5672/queue://q'

View File

@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from proton import Message
messages = [Message(subject='s%d' % i, body='b%d' % i) for i in range(10)]

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import sys
import time
from proton import *
user = os.getenv('ACTIVEMQ_USER') or 'admin'
password = os.getenv('ACTIVEMQ_PASSWORD') or 'password'
host = os.getenv('ACTIVEMQ_HOST') or '127.0.0.1'
port = int(os.getenv('ACTIVEMQ_PORT') or 5672)
destination = sys.argv[1:2] or ['topic://event']
destination = destination[0]
msg = Message()
mng = Messenger()
mng.password=password
mng.start()
mng.subscribe("amqp://%s@%s:%d/%s"%(user, host, port, destination))
count = 0
start = time.time()
while True:
mng.recv(10)
while mng.incoming:
mng.get(msg)
if msg.body=="SHUTDOWN":
diff = time.time() - start
print 'Received %d frames in %f seconds' % (count, diff)
exit(0)
else:
if count==0:
start = time.time()
count+=1
if count % 1000 == 0:
print 'Received %d messages.' % (count)
mng.stop()

View File

@ -1,57 +0,0 @@
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import sys
import time
from proton import *
user = os.getenv('ACTIVEMQ_USER') or 'admin'
password = os.getenv('ACTIVEMQ_PASSWORD') or 'password'
host = os.getenv('ACTIVEMQ_HOST') or '127.0.0.1'
port = int(os.getenv('ACTIVEMQ_PORT') or 5672)
destination = sys.argv[1:2] or ['topic://event']
destination = destination[0]
address = "amqp://%s@%s:%d/%s"%(user, host, port, destination)
msg = Message()
mng = Messenger()
mng.password=password
mng.start()
messages = 10000
msg.address = address
msg.body = unicode('Hello World from Python')
count = 0
start = time.time()
for _ in xrange(messages):
mng.put(msg)
count += 1
if count % 1000 == 0 :
print("Sent %d messages"%(count))
msg.body = unicode("SHUTDOWN")
mng.put(msg)
mng.send
diff = time.time() - start
print 'Sent %s frames in %f seconds' % (count, diff)
mng.stop()

View File

@ -0,0 +1,21 @@
## Overview
This is an example of how to use the python AMQP [Qpid Proton](https://qpid.apache.org/proton/index.html) reactor API with ActiveMQ.
## Prereqs
- linux
- python 3.5+
- you have successfully installed [python-qpid-proton](https://pypi.python.org/pypi/python-qpid-proton) - including any of its [dependencies](https://github.com/apache/qpid-proton/blob/master/INSTALL.md)
- $PYTHONPATH can search this folder
## Running the Examples
In one terminal window run:
python sender.py
In another terminal window run:
python receiver.py
Use the ActiveMQ admin web page to check Messages Enqueued / Dequeued counts match.
You can control which AMQP server the examples try to connect to and the messages they send by changing the values in config.py

View File

@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from proton.handlers import MessagingHandler
from proton.reactor import Container
import address
class Receiver(MessagingHandler):
def __init__(self, url, messages_to_receive=10):
super(Receiver, self).__init__()
self.url = url
self._messages_to_receive = messages_to_receive
self._messages_actually_received = 0
self._stopping = False
def on_start(self, event):
event.container.create_receiver(self.url)
def on_message(self, event):
if self._stopping:
return
print(event.message)
self._messages_actually_received += 1
if self._messages_actually_received == self._messages_to_receive:
event.connection.close()
self._stopping = True
def on_transport_error(self, event):
raise Exception(event.transport.condition)
if __name__ == "__main__":
try:
Container(Receiver(address.url)).run()
except KeyboardInterrupt:
pass

View File

@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from proton.handlers import MessagingHandler
from proton.reactor import Container
import address
import content
class Sender(MessagingHandler):
def __init__(self, url, messages):
super(Sender, self).__init__()
self.url = url
self._messages = messages
self._message_index = 0
self._sent_count = 0
self._confirmed_count = 0
def on_start(self, event):
event.container.create_sender(self.url)
def on_sendable(self, event):
while event.sender.credit and self._sent_count < len(self._messages):
message = self._messages[self._message_index]
print(message)
event.sender.send(message)
self._message_index += 1
self._sent_count += 1
def on_accepted(self, event):
self._confirmed_count += 1
if self._confirmed_count == len(self._messages):
event.connection.close()
def on_transport_error(self, event):
raise Exception(event.transport.condition)
if __name__ == "__main__":
try:
Container(Sender(address.url, content.messages)).run()
except KeyboardInterrupt:
pass