mirror of https://github.com/apache/activemq.git
AMQ-6828 Improvement to python AMQP example.
Migrate example python code to proton reactor API.
(cherry picked from commit cc6cb74c74
)
This commit is contained in:
parent
c4d460bf60
commit
c3d6fb41e8
|
@ -1,6 +1,6 @@
|
|||
## Overview
|
||||
|
||||
This is an example of how use the Java JMS api with ActiveMQ via the AMQP protocol.
|
||||
This is an example of how to use the Java JMS api with ActiveMQ via the AMQP protocol.
|
||||
|
||||
## Prereqs
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
url = 'amqp://admin:admin@127.0.0.1:5672/queue://q'
|
|
@ -0,0 +1,22 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from proton import Message
|
||||
|
||||
messages = [Message(subject='s%d' % i, body='b%d' % i) for i in range(10)]
|
|
@ -1,54 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
from proton import *
|
||||
|
||||
user = os.getenv('ACTIVEMQ_USER') or 'admin'
|
||||
password = os.getenv('ACTIVEMQ_PASSWORD') or 'password'
|
||||
host = os.getenv('ACTIVEMQ_HOST') or '127.0.0.1'
|
||||
port = int(os.getenv('ACTIVEMQ_PORT') or 5672)
|
||||
destination = sys.argv[1:2] or ['topic://event']
|
||||
destination = destination[0]
|
||||
|
||||
msg = Message()
|
||||
mng = Messenger()
|
||||
mng.password=password
|
||||
mng.start()
|
||||
mng.subscribe("amqp://%s@%s:%d/%s"%(user, host, port, destination))
|
||||
|
||||
count = 0
|
||||
start = time.time()
|
||||
while True:
|
||||
mng.recv(10)
|
||||
while mng.incoming:
|
||||
mng.get(msg)
|
||||
if msg.body=="SHUTDOWN":
|
||||
diff = time.time() - start
|
||||
print 'Received %d frames in %f seconds' % (count, diff)
|
||||
exit(0)
|
||||
else:
|
||||
if count==0:
|
||||
start = time.time()
|
||||
count+=1
|
||||
if count % 1000 == 0:
|
||||
print 'Received %d messages.' % (count)
|
||||
|
||||
mng.stop()
|
|
@ -1,57 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
from proton import *
|
||||
|
||||
user = os.getenv('ACTIVEMQ_USER') or 'admin'
|
||||
password = os.getenv('ACTIVEMQ_PASSWORD') or 'password'
|
||||
host = os.getenv('ACTIVEMQ_HOST') or '127.0.0.1'
|
||||
port = int(os.getenv('ACTIVEMQ_PORT') or 5672)
|
||||
destination = sys.argv[1:2] or ['topic://event']
|
||||
destination = destination[0]
|
||||
address = "amqp://%s@%s:%d/%s"%(user, host, port, destination)
|
||||
|
||||
msg = Message()
|
||||
mng = Messenger()
|
||||
mng.password=password
|
||||
mng.start()
|
||||
|
||||
messages = 10000
|
||||
|
||||
msg.address = address
|
||||
msg.body = unicode('Hello World from Python')
|
||||
|
||||
count = 0
|
||||
start = time.time()
|
||||
for _ in xrange(messages):
|
||||
mng.put(msg)
|
||||
count += 1
|
||||
if count % 1000 == 0 :
|
||||
print("Sent %d messages"%(count))
|
||||
|
||||
msg.body = unicode("SHUTDOWN")
|
||||
mng.put(msg)
|
||||
mng.send
|
||||
|
||||
diff = time.time() - start
|
||||
print 'Sent %s frames in %f seconds' % (count, diff)
|
||||
|
||||
mng.stop()
|
|
@ -0,0 +1,21 @@
|
|||
## Overview
|
||||
This is an example of how to use the python AMQP [Qpid Proton](https://qpid.apache.org/proton/index.html) reactor API with ActiveMQ.
|
||||
|
||||
## Prereqs
|
||||
- linux
|
||||
- python 3.5+
|
||||
- you have successfully installed [python-qpid-proton](https://pypi.python.org/pypi/python-qpid-proton) - including any of its [dependencies](https://github.com/apache/qpid-proton/blob/master/INSTALL.md)
|
||||
- $PYTHONPATH can search this folder
|
||||
|
||||
## Running the Examples
|
||||
In one terminal window run:
|
||||
|
||||
python sender.py
|
||||
|
||||
In another terminal window run:
|
||||
|
||||
python receiver.py
|
||||
|
||||
Use the ActiveMQ admin web page to check Messages Enqueued / Dequeued counts match.
|
||||
|
||||
You can control which AMQP server the examples try to connect to and the messages they send by changing the values in config.py
|
|
@ -0,0 +1,57 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from proton.handlers import MessagingHandler
|
||||
from proton.reactor import Container
|
||||
|
||||
import address
|
||||
|
||||
|
||||
class Receiver(MessagingHandler):
|
||||
def __init__(self, url, messages_to_receive=10):
|
||||
super(Receiver, self).__init__()
|
||||
self.url = url
|
||||
self._messages_to_receive = messages_to_receive
|
||||
self._messages_actually_received = 0
|
||||
self._stopping = False
|
||||
|
||||
def on_start(self, event):
|
||||
event.container.create_receiver(self.url)
|
||||
|
||||
def on_message(self, event):
|
||||
if self._stopping:
|
||||
return
|
||||
|
||||
print(event.message)
|
||||
self._messages_actually_received += 1
|
||||
if self._messages_actually_received == self._messages_to_receive:
|
||||
event.connection.close()
|
||||
self._stopping = True
|
||||
|
||||
def on_transport_error(self, event):
|
||||
raise Exception(event.transport.condition)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
Container(Receiver(address.url)).run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
|
@ -0,0 +1,62 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from proton.handlers import MessagingHandler
|
||||
from proton.reactor import Container
|
||||
|
||||
import address
|
||||
import content
|
||||
|
||||
|
||||
class Sender(MessagingHandler):
|
||||
def __init__(self, url, messages):
|
||||
super(Sender, self).__init__()
|
||||
self.url = url
|
||||
self._messages = messages
|
||||
self._message_index = 0
|
||||
self._sent_count = 0
|
||||
self._confirmed_count = 0
|
||||
|
||||
def on_start(self, event):
|
||||
event.container.create_sender(self.url)
|
||||
|
||||
def on_sendable(self, event):
|
||||
while event.sender.credit and self._sent_count < len(self._messages):
|
||||
message = self._messages[self._message_index]
|
||||
print(message)
|
||||
event.sender.send(message)
|
||||
self._message_index += 1
|
||||
self._sent_count += 1
|
||||
|
||||
def on_accepted(self, event):
|
||||
self._confirmed_count += 1
|
||||
if self._confirmed_count == len(self._messages):
|
||||
event.connection.close()
|
||||
|
||||
def on_transport_error(self, event):
|
||||
raise Exception(event.transport.condition)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
Container(Sender(address.url, content.messages)).run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
Loading…
Reference in New Issue