186 lines
7.7 KiB
HTML
186 lines
7.7 KiB
HTML
<!--
|
|
Licensed to the Apache Software Foundation (ASF) under one
|
|
or more contributor license agreements. See the NOTICE file
|
|
distributed with this work for additional information
|
|
regarding copyright ownership. The ASF licenses this file
|
|
to you under the Apache License, Version 2.0 (the
|
|
"License"); you may not use this file except in compliance
|
|
with the License. You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing,
|
|
software distributed under the License is distributed on an
|
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
KIND, either express or implied. See the License for the
|
|
specific language governing permissions and limitations
|
|
under the License.
|
|
-->
|
|
|
|
<html>
|
|
<head>
|
|
<title>ActiveMQ Paging Example</title>
|
|
<link rel="stylesheet" type="text/css" href="../common/common.css" />
|
|
<link rel="stylesheet" type="text/css" href="../common/prettify.css" />
|
|
<script type="text/javascript" src="../common/prettify.js"></script>
|
|
</head>
|
|
<body onload="prettyPrint()">
|
|
<h1>Paging Example</h1>
|
|
|
|
<p>This example shows how ActiveMQ would avoid running out of memory resources by paging messages.</p>
|
|
<p>A maxSize can be specified per Destination via the destinations settings configuration file (activemq-configuration.xml).</p>
|
|
<p>When messages routed to an address exceed the specified maxSize the server will begin to write messages to the file
|
|
system, this is called paging. This will continue to occur until messages have been delivered to consumers and subsequently
|
|
acknowledged freeing up memory. Messages will then be read from the file system , i.e. depaged, and routed as normal. </p>
|
|
<p>Acknowledgement plays an important factor on paging as messages will stay on the file system until the memory is released
|
|
so it is important to make sure that the client acknowledges its messages.</p>
|
|
|
|
|
|
<h2>Example step-by-step</h2>
|
|
<p><i>To run the example, simply type <code>mvn verify</code> from this directory</i></p>
|
|
|
|
<ol>
|
|
<li>First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the <code>client-jndi.properties</code> file in the directory <code>../common/config</code></li>
|
|
<pre class="prettyprint">
|
|
<code>InitialContext initialContext = getContext();</code>
|
|
</pre>
|
|
|
|
<li>We look-up the JMS connection factory object from JNDI</li>
|
|
<pre class="prettyprint">
|
|
<code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
|
|
</pre>
|
|
|
|
<li>We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number of bytes in memory</li>
|
|
<pre class="prettyprint">
|
|
<code>Queue pageQueue = (Queue) initialContext.lookup("/queue/pagingQueue");</code>
|
|
</pre>
|
|
|
|
<li>We look-up the JMS queue object from JNDI.</li>
|
|
<pre class="prettyprint">
|
|
<code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
|
|
</pre>
|
|
|
|
<li>We create a JMS connection</li>
|
|
<pre class="prettyprint">
|
|
<code>connection = cf.createConnection();</code>
|
|
</pre>
|
|
|
|
<li>We create a JMS session. The session is created as non transacted. We will use client acknowledgement on this example.</li>
|
|
<pre class="prettyprint">
|
|
<code>Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);</code>
|
|
</pre>
|
|
|
|
|
|
<li>Create a JMS Message Producer for pageQueueAddress</li>
|
|
<pre class="prettyprint"><code>
|
|
MessageProducer pageMessageProducer = session.createProducer(pageQueue);
|
|
</pre></code>
|
|
|
|
<li>We don't need persistent messages in order to use paging. (This step is optional)</li>
|
|
<pre class="prettyprint"><code>
|
|
pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
|
</pre></code>
|
|
|
|
<li>Create a Binary Bytes Message with 10K arbitrary bytes</li>
|
|
<pre class="prettyprint"><code>
|
|
BytesMessage message = session.createBytesMessage();
|
|
message.writeBytes(new byte[10 * 1024]);
|
|
</pre></code>
|
|
|
|
|
|
<li>Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at ./paging/config/activemq-queues.xml for the config.</li>
|
|
<pre class="prettyprint"><code>
|
|
for (int i = 0; i < 20; i++)
|
|
{
|
|
pageMessageProducer.send(message);
|
|
}
|
|
</pre></code>
|
|
|
|
<li>Create a JMS Message Producer</li>
|
|
<pre class="prettyprint"><code>
|
|
MessageProducer messageProducer = session.createProducer(queue);
|
|
</pre></code>
|
|
|
|
<li>We don't need persistent messages in order to use paging. (This step is optional)</li>
|
|
<pre class="prettyprint"><code>
|
|
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
|
</pre></code>
|
|
|
|
<li>Send the message for about 30K, which should be over the memory limit imposed by the server</li>
|
|
<pre class="prettyprint"><code>
|
|
for (int i = 0; i < 30000; i++)
|
|
{
|
|
messageProducer.send(message);
|
|
}
|
|
</pre></code>
|
|
|
|
<li>if you pause the example here, you will several files under ./build/data/paging</li>
|
|
|
|
<pre class="prettyprint"><code>
|
|
// Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created files just for
|
|
</pre></code>
|
|
|
|
|
|
<li>Create a JMS Message Consumer</li>
|
|
<pre class="prettyprint"><code>
|
|
MessageConsumer messageConsumer = session.createConsumer(queue);
|
|
</pre></code>
|
|
|
|
|
|
<li>Start the JMS Connection. This step will activate the subscribers to receive messages.</li>
|
|
<pre class="prettyprint"><code>
|
|
connection.start();
|
|
</pre></code>
|
|
|
|
|
|
<li>Receive the messages. It's important to ACK for messages as ActiveMQ will not read messages from paging until messages are ACKed</li>
|
|
|
|
<pre class="prettyprint"><code>
|
|
for (int i = 0; i < 30000; i++)
|
|
{
|
|
message = (BytesMessage)messageConsumer.receive(1000);
|
|
|
|
if (i % 1000 == 0)
|
|
{
|
|
System.out.println("Received " + i + " messages");
|
|
|
|
message.acknowledge();
|
|
}
|
|
}
|
|
</pre></code>
|
|
|
|
<li>Receive the messages from the Queue names pageQueue. Create the proper consumer for that.</li>
|
|
<pre class="prettyprint"><code>
|
|
messageConsumer.close();
|
|
messageConsumer = session.createConsumer(pageQueue);
|
|
|
|
for (int i = 0; i < 20; i++)
|
|
{
|
|
message = (BytesMessage)messageConsumer.receive(1000);
|
|
|
|
System.out.println("Received message " + i + " from pageQueue");
|
|
|
|
message.acknowledge();
|
|
}
|
|
</pre></code>
|
|
|
|
<li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
|
|
|
|
<pre class="prettyprint">
|
|
<code>finally
|
|
{
|
|
if (initialContext != null)
|
|
{
|
|
initialContext.close();
|
|
}
|
|
if (connection != null)
|
|
{
|
|
connection.close();
|
|
}
|
|
}</code>
|
|
</pre>
|
|
|
|
</ol>
|
|
</body>
|
|
</html>
|