mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@912656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b38dbba8c5
commit
565a97f98c
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.region;
|
package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
|
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
|
||||||
|
@ -23,6 +24,8 @@ import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTempDestination;
|
import org.apache.activemq.command.ActiveMQTempDestination;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Queue is a List of MessageEntry objects that are dispatched to matching
|
* The Queue is a List of MessageEntry objects that are dispatched to matching
|
||||||
|
@ -31,6 +34,7 @@ import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
* @version $Revision: 1.28 $
|
* @version $Revision: 1.28 $
|
||||||
*/
|
*/
|
||||||
public class TempQueue extends Queue{
|
public class TempQueue extends Queue{
|
||||||
|
private static final Log LOG = LogFactory.getLog(TempQueue.class);
|
||||||
private final ActiveMQTempDestination tempDest;
|
private final ActiveMQTempDestination tempDest;
|
||||||
|
|
||||||
|
|
||||||
|
@ -50,6 +54,7 @@ public class TempQueue extends Queue{
|
||||||
this.tempDest = (ActiveMQTempDestination) destination;
|
this.tempDest = (ActiveMQTempDestination) destination;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void initialize() throws Exception {
|
public void initialize() throws Exception {
|
||||||
this.messages=new VMPendingMessageCursor();
|
this.messages=new VMPendingMessageCursor();
|
||||||
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||||
|
@ -58,6 +63,7 @@ public class TempQueue extends Queue{
|
||||||
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
|
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
|
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
|
||||||
// Only consumers on the same connection can consume from
|
// Only consumers on the same connection can consume from
|
||||||
// the temporary destination
|
// the temporary destination
|
||||||
|
@ -74,4 +80,14 @@ public class TempQueue extends Queue{
|
||||||
}
|
}
|
||||||
super.addSubscription(context, sub);
|
super.addSubscription(context, sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispose(ConnectionContext context) throws IOException {
|
||||||
|
try {
|
||||||
|
purge();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught an exception purging Queue: " + destination);
|
||||||
|
}
|
||||||
|
super.dispose(context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
clearIterator(true);
|
clearIterator(true);
|
||||||
recovered = true;
|
recovered = true;
|
||||||
} else {
|
} else {
|
||||||
LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
|
/*
|
||||||
|
* we should expect to get these - as the message is recorded as it before it goes into
|
||||||
|
* the cache. If subsequently, we pull out that message from the store (before its deleted)
|
||||||
|
* it will be a duplicate - but should be ignored
|
||||||
|
*/
|
||||||
|
//LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
|
||||||
storeHasMessages = true;
|
storeHasMessages = true;
|
||||||
}
|
}
|
||||||
return recovered;
|
return recovered;
|
||||||
|
|
|
@ -20,7 +20,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.region.Destination;
|
import org.apache.activemq.broker.region.Destination;
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
import org.apache.activemq.broker.region.MessageReference;
|
||||||
|
@ -33,38 +32,39 @@ import org.apache.activemq.broker.region.QueueMessageReference;
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
|
private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
|
||||||
private Iterator<MessageReference> iter;
|
private Iterator<MessageReference> iter;
|
||||||
public VMPendingMessageCursor(){
|
public VMPendingMessageCursor() {
|
||||||
this.useCache=false;
|
this.useCache = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
|
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
|
||||||
List<MessageReference> rc = new ArrayList<MessageReference>();
|
throws Exception {
|
||||||
|
List<MessageReference> rc = new ArrayList<MessageReference>();
|
||||||
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
|
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
|
||||||
MessageReference r = iterator.next();
|
MessageReference r = iterator.next();
|
||||||
if( r.getRegionDestination()==destination ) {
|
if (r.getRegionDestination() == destination) {
|
||||||
r.decrementReferenceCount();
|
r.decrementReferenceCount();
|
||||||
rc.add(r);
|
rc.add(r);
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rc ;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if there are no pending messages
|
* @return true if there are no pending messages
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized boolean isEmpty() {
|
public synchronized boolean isEmpty() {
|
||||||
if (list.isEmpty()) {
|
if (list.isEmpty()) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
|
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
|
||||||
MessageReference node = iterator.next();
|
MessageReference node = iterator.next();
|
||||||
if (node== QueueMessageReference.NULL_MESSAGE){
|
if (node == QueueMessageReference.NULL_MESSAGE) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!node.isDropped()) {
|
if (!node.isDropped()) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -79,6 +79,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
/**
|
/**
|
||||||
* reset the cursor
|
* reset the cursor
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void reset() {
|
public synchronized void reset() {
|
||||||
iter = list.listIterator();
|
iter = list.listIterator();
|
||||||
last = null;
|
last = null;
|
||||||
|
@ -89,6 +90,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
*
|
*
|
||||||
* @param node
|
* @param node
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void addMessageLast(MessageReference node) {
|
public synchronized void addMessageLast(MessageReference node) {
|
||||||
node.incrementReferenceCount();
|
node.incrementReferenceCount();
|
||||||
list.addLast(node);
|
list.addLast(node);
|
||||||
|
@ -100,6 +102,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
* @param position
|
* @param position
|
||||||
* @param node
|
* @param node
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void addMessageFirst(MessageReference node) {
|
public synchronized void addMessageFirst(MessageReference node) {
|
||||||
node.incrementReferenceCount();
|
node.incrementReferenceCount();
|
||||||
list.addFirst(node);
|
list.addFirst(node);
|
||||||
|
@ -108,6 +111,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
/**
|
/**
|
||||||
* @return true if there pending messages to dispatch
|
* @return true if there pending messages to dispatch
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized boolean hasNext() {
|
public synchronized boolean hasNext() {
|
||||||
return iter.hasNext();
|
return iter.hasNext();
|
||||||
}
|
}
|
||||||
|
@ -115,8 +119,9 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
/**
|
/**
|
||||||
* @return the next pending message
|
* @return the next pending message
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized MessageReference next() {
|
public synchronized MessageReference next() {
|
||||||
last = (MessageReference)iter.next();
|
last = iter.next();
|
||||||
if (last != null) {
|
if (last != null) {
|
||||||
last.incrementReferenceCount();
|
last.incrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
@ -126,6 +131,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
/**
|
/**
|
||||||
* remove the message at the cursor position
|
* remove the message at the cursor position
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void remove() {
|
public synchronized void remove() {
|
||||||
if (last != null) {
|
if (last != null) {
|
||||||
last.decrementReferenceCount();
|
last.decrementReferenceCount();
|
||||||
|
@ -136,6 +142,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
/**
|
/**
|
||||||
* @return the number of pending messages
|
* @return the number of pending messages
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized int size() {
|
public synchronized int size() {
|
||||||
return list.size();
|
return list.size();
|
||||||
}
|
}
|
||||||
|
@ -143,10 +150,16 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
/**
|
/**
|
||||||
* clear all pending messages
|
* clear all pending messages
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void clear() {
|
public synchronized void clear() {
|
||||||
|
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
|
||||||
|
MessageReference ref = i.next();
|
||||||
|
ref.decrementReferenceCount();
|
||||||
|
}
|
||||||
list.clear();
|
list.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized void remove(MessageReference node) {
|
public synchronized void remove(MessageReference node) {
|
||||||
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
|
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
|
||||||
MessageReference ref = i.next();
|
MessageReference ref = i.next();
|
||||||
|
@ -164,11 +177,19 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
|
||||||
* @param maxItems
|
* @param maxItems
|
||||||
* @return a list of paged in messages
|
* @return a list of paged in messages
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public LinkedList<MessageReference> pageInList(int maxItems) {
|
public LinkedList<MessageReference> pageInList(int maxItems) {
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isTransient() {
|
public boolean isTransient() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
super.destroy();
|
||||||
|
clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,139 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
|
import org.apache.activemq.util.IOHelper;
|
||||||
|
|
||||||
|
public class AMQ2616Test extends TestCase {
|
||||||
|
private static final int NUMBER = 2000;
|
||||||
|
private BrokerService brokerService;
|
||||||
|
private final ArrayList<Thread> threads = new ArrayList<Thread>();
|
||||||
|
String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:61616";
|
||||||
|
AtomicBoolean shutdown = new AtomicBoolean();
|
||||||
|
|
||||||
|
public void testQueueResourcesReleased() throws Exception{
|
||||||
|
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
|
||||||
|
Connection tempConnection = fac.createConnection();
|
||||||
|
tempConnection.start();
|
||||||
|
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue tempQueue = tempSession.createTemporaryQueue();
|
||||||
|
final MessageConsumer tempConsumer = tempSession.createConsumer(tempQueue);
|
||||||
|
|
||||||
|
Connection testConnection = fac.createConnection();
|
||||||
|
long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer testProducer = testSession.createProducer(tempQueue);
|
||||||
|
byte[] payload = new byte[1024*4];
|
||||||
|
for (int i = 0; i < NUMBER; i++ ) {
|
||||||
|
BytesMessage msg = testSession.createBytesMessage();
|
||||||
|
msg.writeBytes(payload);
|
||||||
|
testProducer.send(msg);
|
||||||
|
}
|
||||||
|
long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
assertFalse(startUsage==endUsage);
|
||||||
|
tempConnection.close();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
assertEquals(startUsage,endUsage);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTopicResourcesReleased() throws Exception{
|
||||||
|
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
|
||||||
|
Connection tempConnection = fac.createConnection();
|
||||||
|
tempConnection.start();
|
||||||
|
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic tempTopic = tempSession.createTemporaryTopic();
|
||||||
|
final MessageConsumer tempConsumer = tempSession.createConsumer(tempTopic);
|
||||||
|
|
||||||
|
Connection testConnection = fac.createConnection();
|
||||||
|
long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer testProducer = testSession.createProducer(tempTopic);
|
||||||
|
byte[] payload = new byte[1024*4];
|
||||||
|
for (int i = 0; i < NUMBER; i++ ) {
|
||||||
|
BytesMessage msg = testSession.createBytesMessage();
|
||||||
|
msg.writeBytes(payload);
|
||||||
|
testProducer.send(msg);
|
||||||
|
}
|
||||||
|
long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
assertFalse(startUsage==endUsage);
|
||||||
|
tempConnection.close();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
assertEquals(startUsage,endUsage);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
// Start an embedded broker up.
|
||||||
|
brokerService = new BrokerService();
|
||||||
|
|
||||||
|
KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
|
||||||
|
adaptor.setEnableJournalDiskSyncs(false);
|
||||||
|
File file = new File("target/AMQ2616Test");
|
||||||
|
IOHelper.mkdirs(file);
|
||||||
|
IOHelper.deleteChildren(file);
|
||||||
|
adaptor.setDirectory(file);
|
||||||
|
brokerService.setPersistenceAdapter(adaptor);
|
||||||
|
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry pe = new PolicyEntry();
|
||||||
|
pe.setMemoryLimit(10 * 1024 * 1024);
|
||||||
|
pe.setOptimizedDispatch(true);
|
||||||
|
pe.setProducerFlowControl(false);
|
||||||
|
pe.setExpireMessagesPeriod(1000);
|
||||||
|
pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
|
||||||
|
policyMap.put(new ActiveMQQueue(">"), pe);
|
||||||
|
brokerService.setDestinationPolicy(policyMap);
|
||||||
|
brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024);
|
||||||
|
brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
|
||||||
|
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
|
||||||
|
brokerService.start();
|
||||||
|
new ActiveMQQueue(getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
// Stop any running threads.
|
||||||
|
shutdown.set(true);
|
||||||
|
for (Thread t : threads) {
|
||||||
|
t.interrupt();
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
brokerService.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,154 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs.amq1974;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||||
|
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||||
|
import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
|
||||||
|
|
||||||
|
public class TryJmsClient
|
||||||
|
{
|
||||||
|
private final BrokerService broker = new BrokerService();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
new TryJmsClient().start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start() throws Exception {
|
||||||
|
|
||||||
|
broker.setUseJmx(false);
|
||||||
|
broker.setPersistent(true);
|
||||||
|
broker.setBrokerName("TestBroker");
|
||||||
|
broker.getSystemUsage().setSendFailIfNoSpace(true);
|
||||||
|
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
|
||||||
|
|
||||||
|
KahaPersistenceAdapter persist = new KahaPersistenceAdapter();
|
||||||
|
persist.setDirectory(new File("/tmp/broker2"));
|
||||||
|
persist.setMaxDataFileLength(20 * 1024 * 1024);
|
||||||
|
broker.setPersistenceAdapter(persist);
|
||||||
|
|
||||||
|
String brokerUrl = "tcp://localhost:4501";
|
||||||
|
broker.addConnector(brokerUrl);
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
|
||||||
|
addNetworkBroker();
|
||||||
|
|
||||||
|
startUsageMonitor(broker);
|
||||||
|
|
||||||
|
startMessageSend();
|
||||||
|
|
||||||
|
synchronized(this) {
|
||||||
|
this.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startUsageMonitor(final BrokerService brokerService) {
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("ActiveMQ memeory " + brokerService.getSystemUsage().getMemoryUsage().getPercentUsage()
|
||||||
|
+ " " + brokerService.getSystemUsage().getMemoryUsage().getUsage());
|
||||||
|
System.out.println("ActiveMQ message store " + brokerService.getSystemUsage().getStoreUsage().getPercentUsage());
|
||||||
|
System.out.println("ActiveMQ temp space " + brokerService.getSystemUsage().getTempUsage().getPercentUsage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addNetworkBroker() throws Exception {
|
||||||
|
|
||||||
|
DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
|
||||||
|
dnc.setNetworkTTL(1);
|
||||||
|
dnc.setBrokerName("TestBroker");
|
||||||
|
dnc.setName("Broker1Connector");
|
||||||
|
dnc.setDynamicOnly(true);
|
||||||
|
|
||||||
|
SimpleDiscoveryAgent discoveryAgent = new SimpleDiscoveryAgent();
|
||||||
|
String remoteUrl = "tcp://localhost:4500";
|
||||||
|
discoveryAgent.setServices(remoteUrl);
|
||||||
|
|
||||||
|
dnc.setDiscoveryAgent(discoveryAgent);
|
||||||
|
|
||||||
|
broker.addNetworkConnector(dnc);
|
||||||
|
dnc.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startMessageSend() {
|
||||||
|
new Thread(new MessageSend()).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MessageSend implements Runnable {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
String url = "vm://TestBroker";
|
||||||
|
ActiveMQConnection connection = ActiveMQConnection.makeConnection(url);
|
||||||
|
connection.setDispatchAsync(true);
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination dest = session.createTopic("TestDestination");
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(dest);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
for(int i = 0; i < 99999999; i++) {
|
||||||
|
TextMessage message = session.createTextMessage("test" + i);
|
||||||
|
|
||||||
|
/*
|
||||||
|
try {
|
||||||
|
Thread.sleep(1);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
try {
|
||||||
|
producer.send(message);
|
||||||
|
} catch (Exception e ) {
|
||||||
|
e.printStackTrace();
|
||||||
|
System.out.println("TOTAL number of messages sent " + i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i % 1000 == 0) {
|
||||||
|
System.out.println("sent message " + message.getJMSMessageID());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,127 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs.amq1974;
|
||||||
|
import java.io.File;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||||
|
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||||
|
import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
|
||||||
|
|
||||||
|
public class TryJmsManager {
|
||||||
|
|
||||||
|
private final BrokerService broker = new BrokerService();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
new TryJmsManager().start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start() throws Exception {
|
||||||
|
|
||||||
|
broker.setUseJmx(false);
|
||||||
|
broker.setPersistent(true);
|
||||||
|
broker.setBrokerName("TestBroker");
|
||||||
|
broker.getSystemUsage().setSendFailIfNoSpace(true);
|
||||||
|
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
|
||||||
|
|
||||||
|
KahaPersistenceAdapter persist = new KahaPersistenceAdapter();
|
||||||
|
persist.setDirectory(new File("/tmp/broker1"));
|
||||||
|
persist.setMaxDataFileLength(20 * 1024 * 1024);
|
||||||
|
broker.setPersistenceAdapter(persist);
|
||||||
|
|
||||||
|
String brokerUrl = "tcp://localhost:4500";
|
||||||
|
broker.addConnector(brokerUrl);
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
|
||||||
|
addNetworkBroker();
|
||||||
|
|
||||||
|
startUsageMonitor(broker);
|
||||||
|
|
||||||
|
startMessageConsumer();
|
||||||
|
|
||||||
|
synchronized(this) {
|
||||||
|
this.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startUsageMonitor(final BrokerService brokerService) {
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("ActiveMQ memeory " + brokerService.getSystemUsage().getMemoryUsage().getPercentUsage()
|
||||||
|
+ " " + brokerService.getSystemUsage().getMemoryUsage().getUsage());
|
||||||
|
System.out.println("ActiveMQ message store " + brokerService.getSystemUsage().getStoreUsage().getPercentUsage());
|
||||||
|
System.out.println("ActiveMQ temp space " + brokerService.getSystemUsage().getTempUsage().getPercentUsage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addNetworkBroker() throws Exception {
|
||||||
|
DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
|
||||||
|
dnc.setNetworkTTL(1);
|
||||||
|
dnc.setBrokerName("TestBroker");
|
||||||
|
dnc.setName("Broker1Connector");
|
||||||
|
dnc.setDynamicOnly(true);
|
||||||
|
|
||||||
|
SimpleDiscoveryAgent discoveryAgent = new SimpleDiscoveryAgent();
|
||||||
|
String remoteUrl = "tcp://localhost:4501";
|
||||||
|
discoveryAgent.setServices(remoteUrl);
|
||||||
|
|
||||||
|
dnc.setDiscoveryAgent(discoveryAgent);
|
||||||
|
|
||||||
|
broker.addNetworkConnector(dnc);
|
||||||
|
dnc.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startMessageConsumer() throws JMSException, URISyntaxException {
|
||||||
|
String url = "vm://TestBroker";
|
||||||
|
ActiveMQConnection connection = ActiveMQConnection.makeConnection(url);
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination dest = session.createTopic("TestDestination");
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(dest);
|
||||||
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
System.out.println("got message " + message.getJMSMessageID());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue