Porting over tests in the 4.1 branch and updated the UsageManager so that if a Limit is set, then

it is used instead of being % based off the parent.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@517222 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-03-12 14:01:01 +00:00
parent c0af1f5116
commit 7de0e81995
6 changed files with 645 additions and 7 deletions

View File

@ -265,6 +265,9 @@
<!-- The NIO implemenation is not working properly on OS X.. -->
<exclude>**/nio/**</exclude>
<exclude>**/AMQDeadlockTest3.*</exclude>
<exclude>**/ProducerFlowControlTest.*</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -102,7 +102,7 @@ public class Queue implements Destination, Task {
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
this.usageManager = new UsageManager(memoryManager,destination.toString());
this.usageManager.setLimit(Long.MAX_VALUE);
this.usageManager.setUsagePortion(1.0f);
this.store = store;
if(destination.isTemporary()){
this.messages=new VMPendingMessageCursor();

View File

@ -76,7 +76,7 @@ public class Topic implements Destination {
this.destination = destination;
this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager,destination.toString());
this.usageManager.setLimit(Long.MAX_VALUE);
this.usageManager.setUsagePortion(1.0f);
// Let the store know what usage manager we are using so that he can flush messages to disk
// when usage gets high.

View File

@ -194,7 +194,8 @@ public class UsageManager implements Service{
}
/**
* Sets the memory limit in bytes.
* Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since
* the UsageManager is not going to be portion based off the parent.
*
* When set using XBean, you can use values such as: "20 mb", "1024 kb", or "1 gb"
*
@ -204,17 +205,49 @@ public class UsageManager implements Service{
if(percentUsageMinDelta < 0 ) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
synchronized(usageMutex){
this.limit=limit;
this.usagePortion=0;
}
onLimitChange();
}
private void onLimitChange() {
// We may need to calculate the limit
if( usagePortion > 0 && parent!=null ) {
synchronized(usageMutex){
limit = (long)(parent.getLimit()*usagePortion);
}
}
// Reset the percent currently being used.
int percentUsage;
synchronized(usageMutex){
this.limit=parent!=null?(long)(parent.limit*usagePortion):limit;
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
// Let the children know that the limit has changed. They may need to set
// their limits based on ours.
for (UsageManager child:children) {
child.setLimit(limit);
child.onLimitChange();
}
}
}
public float getUsagePortion() {
synchronized(usageMutex){
return usagePortion;
}
}
public void setUsagePortion(float usagePortion) {
synchronized(usageMutex){
this.usagePortion = usagePortion;
}
onLimitChange();
}
/*
* Sets the minimum number of percentage points the usage has to change before a UsageListener
* event is fired by the manager.
@ -370,4 +403,5 @@ public class UsageManager implements Service{
}
}
}

View File

@ -0,0 +1,438 @@
package org.apache.activemq;
import java.net.URI;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQDeadlockTest3 extends TestCase {
private static final String URL1 = "tcp://localhost:61616";
private static final String URL2 = "tcp://localhost:61617";
private static final String QUEUE1_NAME = "test.queue.1";
private static final String QUEUE2_NAME = "test.queue.2";
private static final int MAX_CONSUMERS = 1;
private static final int MAX_PRODUCERS = 1;
private static final int NUM_MESSAGE_TO_SEND = 10;
private AtomicInteger messageCount = new AtomicInteger();
private CountDownLatch doneLatch;
public void setUp() throws Exception {
}
public void tearDown() throws Exception {
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
BrokerService brokerService1 = null;
ActiveMQConnectionFactory acf = null;
PooledConnectionFactory pcf = null;
DefaultMessageListenerContainer container1 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, null);
brokerService1.start();
acf = createConnectionFactory(URL1);
pcf = new PooledConnectionFactory(acf);
// Only listen on the first queue.. let the 2nd queue fill up.
doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
Thread.sleep(2000);
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
Thread.sleep(1000);
executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
}
// Wait for all message to arrive.
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
brokerService1.stop();
brokerService1 = null;
}
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
ActiveMQConnectionFactory acf1 = null;
ActiveMQConnectionFactory acf2 = null;
PooledConnectionFactory pcf = null;
DefaultMessageListenerContainer container1 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, URL2);
brokerService1.start();
brokerService2 = createBrokerService("broker2", URL2, URL1);
brokerService2.start();
acf1 = createConnectionFactory(URL1);
acf2 = createConnectionFactory(URL2);
pcf = new PooledConnectionFactory(acf1);
Thread.sleep(1000);
doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
Thread.sleep(1000);
executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
}
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
brokerService1.stop();
brokerService1 = null;
brokerService2.stop();
brokerService2 = null;
}
}
// This should fail with incubator-activemq-fuse-4.1.0.5
public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
ActiveMQConnectionFactory acf1 = null;
ActiveMQConnectionFactory acf2 = null;
DefaultMessageListenerContainer container1 = null;
DefaultMessageListenerContainer container2 = null;
try {
brokerService1 = createBrokerService("broker1", URL1, URL2);
brokerService1.start();
brokerService2 = createBrokerService("broker2", URL2, URL1);
brokerService2.start();
acf1 = createConnectionFactory(URL1);
acf2 = createConnectionFactory(URL2);
Thread.sleep(1000);
doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
container1.afterPropertiesSet();
container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
container2.afterPropertiesSet();
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < MAX_PRODUCERS; i++) {
executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
Thread.sleep(1000);
executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
}
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
container2.stop();
container2.destroy();
container2 = null;
brokerService1.stop();
brokerService1 = null;
brokerService2.stop();
brokerService2 = null;
}
}
private BrokerService createBrokerService(final String brokerName,
final String uri1, final String uri2) throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setBrokerName(brokerName);
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final UsageManager memoryManager = new UsageManager();
memoryManager.setLimit(5000000);
brokerService.setMemoryManager(memoryManager);
final ArrayList policyEntries = new ArrayList();
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
// entry.setQueue(QUEUE1_NAME);
entry.setMemoryLimit(1000);
policyEntries.add(entry);
final PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(policyEntries);
brokerService.setDestinationPolicy(policyMap);
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(uri1));
tConnector.setBrokerName(brokerName);
tConnector.setName(brokerName + ".transportConnector");
brokerService.addConnector(tConnector);
if (uri2 != null) {
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
nc.setBridgeTempDestinations(true);
nc.setBrokerName(brokerName);
nc.setName(brokerName + ".nc");
brokerService.addNetworkConnector(nc);
}
return brokerService;
}
public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
final ConnectionFactory acf, final MessageListener listener,
final String queue) {
final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(acf);
container.setDestinationName(queue);
container.setMessageListener(listener);
container.setSessionTransacted(false);
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
container.setConcurrentConsumers(MAX_CONSUMERS);
return container;
}
public ActiveMQConnectionFactory createConnectionFactory(final String url) {
final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
acf.setCopyMessageOnSend(false);
acf.setUseAsyncSend(false);
acf.setDispatchAsync(true);
acf.setUseCompression(false);
acf.setOptimizeAcknowledge(false);
acf.setOptimizedMessageDispatch(true);
acf.setUseSyncSend(true);
return acf;
}
private class TestMessageListener1 implements MessageListener {
private final long waitTime;
public TestMessageListener1(long waitTime) {
this.waitTime = waitTime;
}
public void onMessage(Message msg) {
try {
System.out.println("Listener1 Consumed message "+ msg.getIntProperty("count"));
messageCount.incrementAndGet();
doneLatch.countDown();
Thread.sleep(waitTime);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private class PooledProducerTask implements Runnable {
private final String queueName;
private final PooledConnectionFactory pcf;
public PooledProducerTask(final PooledConnectionFactory pcf,
final String queueName) {
this.pcf = pcf;
this.queueName = queueName;
}
public void run() {
try {
final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setMessageIdEnabled(false);
jmsTemplate.setMessageTimestampEnabled(false);
jmsTemplate.afterPropertiesSet();
final byte[] bytes = new byte[2048];
final Random r = new Random();
r.nextBytes(bytes);
Thread.sleep(2000);
final AtomicInteger count = new AtomicInteger();
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
final BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);
message.setIntProperty("count", count.incrementAndGet());
message.setStringProperty("producer", "pooled");
return message;
}
});
System.out.println("PooledProducer sent message: "+ count.get());
// Thread.sleep(1000);
}
} catch (final Throwable e) {
System.err.println("Producer 1 is exiting.");
e.printStackTrace();
}
}
}
private class NonPooledProducerTask implements Runnable {
private final String queueName;
private final ConnectionFactory cf;
public NonPooledProducerTask(final ConnectionFactory cf,
final String queueName) {
this.cf = cf;
this.queueName = queueName;
}
public void run() {
try {
final JmsTemplate jmsTemplate = new JmsTemplate(cf);
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setMessageIdEnabled(false);
jmsTemplate.setMessageTimestampEnabled(false);
jmsTemplate.afterPropertiesSet();
final byte[] bytes = new byte[2048];
final Random r = new Random();
r.nextBytes(bytes);
Thread.sleep(2000);
final AtomicInteger count = new AtomicInteger();
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
final BytesMessage message = session
.createBytesMessage();
message.writeBytes(bytes);
message.setIntProperty("count", count
.incrementAndGet());
message.setStringProperty("producer", "non-pooled");
return message;
}
});
System.out.println("Non-PooledProducer sent message: " + count.get());
// Thread.sleep(1000);
}
} catch (final Throwable e) {
System.err.println("Producer 1 is exiting.");
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,163 @@
package org.apache.activemq;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
public class ProducerFlowControlTest extends JmsTestSupport {
ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
private TransportConnector connector;
private ActiveMQConnection connection;
public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
factory.setUseSyncSend(true);
connection = (ActiveMQConnection) factory.createConnection();
connections.add(connection);
connection.start();
// Test sending to Queue A
// 1st send should not block.
fillQueue(queueA);
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queueB);
// Test sending to Queue B it should block.
// Since even though the it's queue limits have not been reached, the connection
// is blocked.
CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
TextMessage msg = (TextMessage) consumer.receive();
assertEquals("Message 1", msg.getText());
msg.acknowledge();
pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
msg = (TextMessage) consumer.receive();
assertEquals("Message 2", msg.getText());
msg.acknowledge();
}
public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
ConnectionFactory factory = createConnectionFactory();
connection = (ActiveMQConnection) factory.createConnection();
connections.add(connection);
connection.start();
// Test sending to Queue A
// 1st send should not block.
fillQueue(queueA);
// Test sending to Queue B it should block.
// Since even though the it's queue limits have not been reached, the connection
// is blocked.
CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
}
private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
// Starts an async thread that every time it publishes it sets the done flag to false.
// Once the send starts to block it will not reset the done flag anymore.
new Thread("Fill thread.") {
public void run() {
Session session=null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while( keepGoing.get() ) {
done.set(false);
producer.send(session.createTextMessage("Hello World"));
}
} catch (JMSException e) {
} finally {
safeClose(session);
}
}
}.start();
while( true ) {
Thread.sleep(1000);
// the producer is blocked once the done flag stays true.
if( done.get() )
break;
done.set(true);
}
keepGoing.set(false);
}
private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
final CountDownLatch done = new CountDownLatch(1);
new Thread("Send thread.") {
public void run() {
Session session=null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage(message));
done.countDown();
} catch (JMSException e) {
} finally {
safeClose(session);
}
}
}.start();
return done;
}
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setPersistent(false);
service.setUseJmx(false);
// Setup a destination policy where it takes only 1 message at a time.
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setMemoryLimit(1);
policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
policyMap.setDefaultEntry(policy);
service.setDestinationPolicy(policyMap);
connector = service.addConnector("tcp://localhost:0");
return service;
}
protected void tearDown() throws Exception {
TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
t.getTransportListener().onException(new IOException("Disposed."));
connection.getTransport().stop();
super.tearDown();
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(connector.getConnectUri());
}
}