Refactored the testsuite a bit

With these changes the testsuite is not using any LGPL code by default. Anything woud go through extra-tests

These following changes were made on this commit (in summary):
- renamed byteman-tests as extra-tests
- added extra-tests on a new profile called extra-tests
- added all the other tests back to the tests profile
- removed concurrent-tests and moved them all to timing-tests
- removed old tests that were ignored for a long time and were stale
This commit is contained in:
Clebert Suconic 2015-03-06 11:56:17 -05:00
parent b5a04eb79f
commit 3661829e6d
57 changed files with 417 additions and 7630 deletions

View File

@ -436,7 +436,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
}
// We have to perform the server.stop outside of the lock because of backup activation issues.
// See https://bugzilla.redhat.com/show_bug.cgi?id=959616
// And org.apache.activemq.byteman.tests.StartStopDeadlockTest which is validating for this case here
// And org.apache.activemq.extras.tests.StartStopDeadlockTest which is validating for this case here
server.stop();
}

View File

@ -63,7 +63,7 @@
<resteasy.version>3.0.9.Final</resteasy.version>
<skipUnitTests>true</skipUnitTests>
<skipJmsTests>true</skipJmsTests>
<skipBytemanTests>true</skipBytemanTests>
<skipExtraTests>true</skipExtraTests>
<skipIntegrationTests>true</skipIntegrationTests>
<skipJoramTests>true</skipJoramTests>
<skipTimingTests>true</skipTimingTests>
@ -517,7 +517,7 @@
<skipStressTests>true</skipStressTests>
<skipSoakTests>true</skipSoakTests>
<skipPerformanceTests>true</skipPerformanceTests>
<skipBytemanTests>false</skipBytemanTests>
<skipExtraTests>false</skipExtraTests>
</properties>
</profile>
<profile>
@ -552,7 +552,7 @@
<skipJoramTests>false</skipJoramTests>
<skipConcurrentTests>false</skipConcurrentTests>
<skipRestTests>false</skipRestTests>
<skipBytemanTests>false</skipBytemanTests>
<skipExtraTests>false</skipExtraTests>
<skipStyleCheck>false</skipStyleCheck>
</properties>
</profile>

View File

@ -1,101 +0,0 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>activemq-tests-pom</artifactId>
<version>6.0.1-SNAPSHOT</version>
</parent>
<artifactId>concurrent-tests</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ6 concurrent Tests</name>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>unit-tests</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>integration-tests</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-ra</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-bootstrap</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipConcurrentTests}</skipTests>
<includes>
<include>**/*Test.java</include>
</includes>
<argLine>${activemq-surefire-argline}</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -14,6 +14,11 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- This test folder contains tests that are not part of the regular testsuite
because they use optional libraries such as LGPL or private ones.
They are optional and will validate extra functionality available through Service Integration
Example: Transaction Manager -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
@ -23,9 +28,9 @@
<version>6.0.1-SNAPSHOT</version>
</parent>
<artifactId>byteman-tests</artifactId>
<artifactId>extra-tests</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ6 ByteMan Tests</name>
<name>ActiveMQ6 Extra Tests</name>
<properties>
<tools.jar>${java.home}/../lib/tools.jar</tools.jar>
@ -163,6 +168,21 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- Needed for JMS Bridge Tests -->
<dependency>
<groupId>org.jboss.jbossts.jts</groupId>
<artifactId>jbossjts-jacorb</artifactId>
<version>4.17.13.Final</version>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
</dependency>
</dependencies>
<build>
@ -183,10 +203,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipBytemanTests}</skipTests>
<includes>
<include>org.apache.activemq/byteman/tests/*.java</include>
</includes>
<skipTests>${skipExtraTests}</skipTests>
<!-- ensure we don't inherit a byteman jar form any env settings -->
<environmentVariables>
<BYTEMAN_HOME></BYTEMAN_HOME>

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.extras;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.MessageLogger;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* 4/18/12
*/
@MessageLogger(projectCode = "HQTEST")
public interface ExtrasTestLogger extends BasicLogger
{
/**
* The integration test logger.
*/
ExtrasTestLogger LOGGER = Logger.getMessageLogger(ExtrasTestLogger.class, ExtrasTestLogger.class.getPackage().getName());
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
@ -81,7 +81,7 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
targetMethod = "xaEnd",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ActiveMQMessageHandlerTest.interrupt();"
action = "org.apache.activemq.tests.extras.byteman.ActiveMQMessageHandlerTest.interrupt();"
)
}
)
@ -158,7 +158,7 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
targetMethod = "xaEnd",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ActiveMQMessageHandlerTest.interrupt();"
action = "org.apache.activemq.tests.extras.byteman.ActiveMQMessageHandlerTest.interrupt();"
)
}
)

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@ -115,7 +115,7 @@ public class BMFailoverTest extends FailoverTestBase
targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
targetMethod = "xaEnd",
targetLocation = "AT EXIT",
action = "org.apache.activemq.byteman.tests.BMFailoverTest.stopAndThrow()"
action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.stopAndThrow()"
)
}
)
@ -206,7 +206,7 @@ public class BMFailoverTest extends FailoverTestBase
targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
targetMethod = "start(javax.transaction.xa.Xid, int)",
targetLocation = "AT EXIT",
action = "org.apache.activemq.byteman.tests.BMFailoverTest.serverToStop.getServer().stop(true)"
action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
)
}
)
@ -309,7 +309,7 @@ public class BMFailoverTest extends FailoverTestBase
targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
targetMethod = "commit",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.BMFailoverTest.serverToStop.getServer().stop(true)"
action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
)
}
)
@ -350,7 +350,7 @@ public class BMFailoverTest extends FailoverTestBase
targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
targetMethod = "commit",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.BMFailoverTest.serverToStop.getServer().stop(true)"
action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
)
}
)

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.ArrayList;
import java.util.HashMap;
@ -57,7 +57,7 @@ public class BridgeServerLocatorConfigurationTest extends ServiceTestBase
@Test
@BMRule(name = "check connection ttl",
targetClass = "org.apache.activemq.byteman.tests.BridgeServerLocatorConfigurationTest",
targetClass = "org.apache.activemq.tests.extras.byteman.BridgeServerLocatorConfigurationTest",
targetMethod = "getBridgeTTL(ActiveMQServer, String)", targetLocation = "EXIT",
action = "$! = $0.getConfiguredBridge($1).serverLocator.getConnectionTTL();")
/**

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
@ -128,7 +128,7 @@ public class ClosingConnectionTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.journal.impl.NIOSequentialFile",
targetMethod = "open(int, boolean)",
targetLocation = "AT INVOKE java.nio.channels.FileChannel.size()",
action = "org.apache.activemq.byteman.tests.ClosingConnectionTest.killConnection();"
action = "org.apache.activemq.tests.extras.byteman.ClosingConnectionTest.killConnection();"
)
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -54,7 +54,7 @@ public class ClusteredGroupingTest extends ClusterTestBase
targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
targetMethod = "removeGrouping",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.pause($1);"
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause($1);"
),
@BMRule
(
@ -62,7 +62,7 @@ public class ClusteredGroupingTest extends ClusterTestBase
targetClass = "org.apache.activemq.core.server.group.impl.GroupHandlingAbstract",
targetMethod = "forceRemove",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.restart2();"
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();"
)
}
)
@ -131,13 +131,13 @@ public class ClusteredGroupingTest extends ClusterTestBase
targetClass = "org.apache.activemq.core.server.group.impl.RemoteGroupingHandler",
targetMethod = "onNotification",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.pause2($1);"
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
),
@BMRule(name = "blow-up2",
targetClass = "org.apache.activemq.core.server.group.impl.RemoteGroupingHandler",
targetMethod = "remove",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.restart2();")
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
}
)
public void test3serversLocalGoesDown() throws Exception
@ -216,13 +216,13 @@ public class ClusteredGroupingTest extends ClusterTestBase
targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
targetMethod = "onNotification",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.pause2($1);"
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
),
@BMRule(name = "blow-up2",
targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
targetMethod = "remove",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.restart2();")
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
}
)
public void testLocal3serversLocalGoesDown() throws Exception
@ -301,13 +301,13 @@ public class ClusteredGroupingTest extends ClusterTestBase
targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
targetMethod = "onNotification",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.pause2($1);"
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
),
@BMRule(name = "blow-up2",
targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
targetMethod = "remove",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ClusteredGroupingTest.restart2();")
action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
}
)
public void testLocal4serversLocalGoesDown() throws Exception

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -81,7 +81,7 @@ public class GroupingTest extends JMSTestBase
targetClass = "org.apache.activemq.core.server.impl.ServerSessionImpl",
targetMethod = "rollback",
targetLocation = "EXIT",
action = "org.apache.activemq.byteman.tests.GroupingTest.pause();"
action = "org.apache.activemq.tests.extras.byteman.GroupingTest.pause();"
)
}
)

View File

@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.settings.impl;
package org.apache.activemq.tests.extras.byteman;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.core.settings.impl.HierarchicalObjectRepository;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@ -34,7 +35,7 @@ import org.junit.runner.RunWith;
@BMRules(rules = { @BMRule(name = "modify map during iteration",
targetClass = "org.apache.activemq.core.settings.impl.HierarchicalObjectRepository",
targetMethod = "getPossibleMatches(String)", targetLocation = "AT INVOKE java.util.HashMap.put",
action = "org.apache.activemq.core.settings.impl.HierarchicalObjectRepositoryTest.bum()"), })
action = "org.apache.activemq.tests.extras.byteman.HierarchicalObjectRepositoryTest.bum()"), })
public class HierarchicalObjectRepositoryTest
{
private static final String A = "a.";

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import org.apache.activemq.core.client.impl.ClientProducerCredits;
import org.apache.activemq.core.message.impl.MessageInternal;
@ -25,7 +25,7 @@ import org.apache.activemq.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.jms.bridge.QualityOfServiceMode;
import org.apache.activemq.jms.bridge.impl.JMSBridgeImpl;
import org.apache.activemq.jms.server.JMSServerManager;
import org.apache.activemq.tests.integration.jms.bridge.BridgeTestBase;
import org.apache.activemq.tests.extras.jms.bridge.BridgeTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@ -49,7 +49,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase
targetClass = "org.apache.activemq.core.protocol.core.impl.ChannelImpl",
targetMethod = "send",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.JMSBridgeReconnectionTest.pause($1);"
action = "org.apache.activemq.tests.extras.byteman.JMSBridgeReconnectionTest.pause($1);"
),
@BMRule
(
@ -57,7 +57,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase
targetClass = "org.apache.activemq.core.client.impl.ClientProducerImpl",
targetMethod = "sendRegularMessage",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.JMSBridgeReconnectionTest.pause2($1,$2,$3);"
action = "org.apache.activemq.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($1,$2,$3);"
)
}
)

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
@ -71,7 +71,7 @@ public class MessageCopyTest
),
@BMRule(
name = "JMSServer.stop wait-init",
targetClass = "org.apache.activemq.byteman.tests.MessageCopyTest",
targetClass = "org.apache.activemq.tests.extras.byteman.MessageCopyTest",
targetMethod = "simulateRead",
targetLocation = "EXIT",
action = "signalWake(\"finish-read\", true)"

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import org.apache.activemq.api.core.SimpleString;
@ -132,7 +132,7 @@ public class OrphanedConsumerTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
targetMethod = "close",
targetLocation = "AT EXIT",
condition = "org.apache.activemq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
action = "System.out.println(\"throwing stuff\");throw new InterruptedException()"
),
@BMRule
@ -141,8 +141,8 @@ public class OrphanedConsumerTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
targetMethod = "close",
targetLocation = "ENTRY",
condition = "org.apache.activemq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
action = "org.apache.activemq.byteman.tests.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
action = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
)
}
@ -171,7 +171,7 @@ public class OrphanedConsumerTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
targetMethod = "close",
targetLocation = "AT EXIT",
condition = "org.apache.activemq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
action = "System.out.println(\"throwing stuff\");throw new InterruptedException()"
),
@BMRule
@ -180,8 +180,8 @@ public class OrphanedConsumerTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.server.impl.ServerConsumerImpl",
targetMethod = "close",
targetLocation = "ENTRY",
condition = "org.apache.activemq.byteman.tests.OrphanedConsumerTest.isConditionActive()",
action = "org.apache.activemq.byteman.tests.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
condition = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.isConditionActive()",
action = "org.apache.activemq.tests.extras.byteman.OrphanedConsumerTest.leavingCloseOnTestCountersWhileClosing()"
)
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.ArrayList;
import java.util.List;
@ -76,7 +76,7 @@ public class PagingLeakTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.paging.cursor.impl.PagePositionImpl",
targetMethod = "<init>()",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.PagingLeakTest.newPosition()"
action = "org.apache.activemq.tests.extras.byteman.PagingLeakTest.newPosition()"
),
@BMRule
(
@ -84,7 +84,7 @@ public class PagingLeakTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.paging.cursor.impl.PagePositionImpl",
targetMethod = "finalize",
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.PagingLeakTest.deletePosition()"
action = "org.apache.activemq.tests.extras.byteman.PagingLeakTest.deletePosition()"
)
}
)

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.concurrent.CountDownLatch;
@ -53,7 +53,7 @@ public class ReplicationBackupTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.server.impl.SharedNothingLiveActivation",
targetMethod = "run",
targetLocation = "AT EXIT",
action = "org.apache.activemq.byteman.tests.ReplicationBackupTest.breakIt();"
action = "org.apache.activemq.tests.extras.byteman.ReplicationBackupTest.breakIt();"
)
}
)

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ClientMessage;
@ -104,7 +104,7 @@ public class ScaleDownFailoverTest extends ClusterTestBase
targetMethod = "createSessionFactory(org.apache.activemq.api.core.TransportConfiguration, int, boolean)",
isInterface = true,
targetLocation = "ENTRY",
action = "org.apache.activemq.byteman.tests.ScaleDownFailoverTest.fail($1);"
action = "org.apache.activemq.tests.extras.byteman.ScaleDownFailoverTest.fail($1);"
)
public void testScaleDownWhenFirstServerFails() throws Exception
{

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.core.config.ScaleDownConfiguration;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
public class ScaleDownGroupedFailoverTest extends ScaleDownFailoverTest
{

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
public class ScaleDownGroupedFailureTest extends ScaleDownFailureTest
{

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@ -71,7 +71,7 @@ public class StartStopDeadlockTest extends ServiceTestBase
),
@BMRule(
name = "StartStopDeadlockTest tearDown",
targetClass = "org.apache.activemq.byteman.tests.StartStopDeadlockTest",
targetClass = "org.apache.activemq.tests.extras.byteman.StartStopDeadlockTest",
targetMethod = "tearDown",
targetLocation = "ENTRY",
action = "deleteCounter(\"server-Init\")"

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.byteman.tests;
package org.apache.activemq.tests.extras.byteman;
import java.util.HashMap;
import java.util.Map;
@ -61,7 +61,7 @@ public class StompInternalStateTest extends ServiceTestBase
targetClass = "org.apache.activemq.core.protocol.stomp.StompProtocolManager",
targetMethod = "onNotification(org.apache.activemq.core.server.management.Notification)",
targetLocation = "EXIT",
helper = "org.apache.activemq.byteman.tests.StompInternalStateTest",
helper = "org.apache.activemq.tests.extras.byteman.StompInternalStateTest",
action = "verifyBindingAddRemove($1, $0.destinations)"
)
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.jms.bridge;
package org.apache.activemq.tests.extras.jms.bridge;
import javax.jms.BytesMessage;
import javax.jms.Connection;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.jms.bridge;
package org.apache.activemq.tests.extras.jms.bridge;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.jms.bridge;
package org.apache.activemq.tests.extras.jms.bridge;
import javax.transaction.TransactionManager;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.jms.bridge;
package org.apache.activemq.tests.extras.jms.bridge;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ClientSession;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.jms.bridge;
package org.apache.activemq.tests.extras.jms.bridge;
import javax.jms.Connection;
import javax.jms.DeliveryMode;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.tests.integration.jms.bridge;
package org.apache.activemq.tests.extras.jms.bridge;
import javax.transaction.TransactionManager;

View File

@ -181,18 +181,6 @@
<version>5.0.0.GA</version>
</dependency>
<!-- Needed for JMS Bridge Tests -->
<dependency>
<groupId>org.jboss.jbossts.jts</groupId>
<artifactId>jbossjts-jacorb</artifactId>
<version>4.17.13.Final</version>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
</dependency>
<!--Vertx provided dependencies-->
<dependency>
<groupId>io.vertx</groupId>

View File

@ -31,7 +31,6 @@ import org.apache.activemq.ra.ActiveMQRAManagedConnectionFactory;
import org.apache.activemq.ra.ActiveMQResourceAdapter;
import org.apache.activemq.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.service.extensions.ServiceUtils;
import org.apache.activemq.tests.integration.jms.bridge.TransactionManagerLocatorImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -68,7 +67,6 @@ public class JMSContextTest extends ActiveMQRATestBase
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
MyBootstrapContext ctx = new MyBootstrapContext();
TransactionManagerLocatorImpl.tm = DummyTransactionManager.tm;
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.concurrent.stomp;
package org.apache.activemq.tests.integration.stomp;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -26,7 +26,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.core.protocol.stomp.Stomp;
import org.apache.activemq.tests.integration.stomp.StompTestBase;
import org.junit.Assert;
import org.junit.Test;

View File

@ -34,7 +34,6 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.management.QueueControl;
import org.apache.activemq.api.jms.management.JMSQueueControl;
@ -46,7 +45,6 @@ import org.apache.activemq.jms.server.config.ConnectionFactoryConfiguration;
import org.apache.activemq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.service.extensions.ServiceUtils;
import org.apache.activemq.tests.integration.jms.bridge.TransactionManagerLocatorImpl;
import org.apache.activemq.tests.integration.ra.DummyTransactionManager;
import org.apache.activemq.tests.unit.util.InVMNamingContext;
import org.junit.After;
@ -234,7 +232,6 @@ public class JMSTestBase extends ServiceTestBase
mbeanServer = null;
TransactionManagerLocatorImpl.tm = null;
ServiceUtils.setTransactionManager(null);
super.tearDown();
@ -301,11 +298,6 @@ public class JMSTestBase extends ServiceTestBase
}
}
protected void useRealTransactionManager()
{
ServiceUtils.setTransactionManager((javax.transaction.TransactionManager) new TransactionManagerImple());
}
protected void useDummyTransactionManager()
{
ServiceUtils.setTransactionManager(new DummyTransactionManager());

View File

@ -1 +1 @@
org.apache.activemq.tests.integration.jms.bridge.TransactionManagerLocatorImpl
org.apache.activemq.tests.extras.jms.bridge.TransactionManagerLocatorImpl

View File

@ -95,19 +95,6 @@
<artifactId>geronimo-ejb_3.0_spec</artifactId>
</dependency>
<!--this specifically for the XA Tests -->
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.0.0.Final</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-spi</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-jaspi</artifactId>
@ -154,14 +141,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipJmsTests}</skipTests>
<excludes>
<!-- TODO: Can we remove XATests? Can we remove them all? -->
<exclude>org/apache/activemq/jms/tests/XARecoveryTest.java</exclude>
<exclude>org/apache/activemq/jms/tests/XAResourceRecoveryTest.java</exclude>
<exclude>org/apache/activemq/jms/tests/XATest.java</exclude>
<exclude>org/apache/activemq/jms/tests/stress/*.java</exclude>
<exclude>org/apache/activemq/jms/tests/manual/**</exclude>
</excludes>
<argLine>${activemq-surefire-argline}</argLine>
</configuration>
</plugin>

View File

@ -1,297 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.apache.activemq.jms.tests.JmsTestLogger;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* This test was added to test regression on http://jira.jboss.com/jira/browse/JBMESSAGING-660
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class ConcurrentCloseStressTest extends ActiveMQServerTestCase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
InitialContext ic;
ConnectionFactory cf;
Queue queue;
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
// ServerManagement.start("all");
ic = getInitialContext();
cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
destroyQueue("TestQueue");
createQueue("TestQueue");
queue = (Queue)ic.lookup("queue/TestQueue");
log.debug("setup done");
}
@Override
@After
public void tearDown() throws Exception
{
destroyQueue("TestQueue");
super.tearDown();
}
@Test
public void testProducersAndConsumers() throws Exception
{
Connection connectionProducer = cf.createConnection();
Connection connectionReader = cf.createConnection();
connectionReader.start();
connectionProducer.start(); // try with and without this...
ProducerThread[] producerThread = new ProducerThread[20];
ReaderThread[] readerThread = new ReaderThread[20];
TestThread[] threads = new TestThread[40];
for (int i = 0; i < 20; i++)
{
producerThread[i] = new ProducerThread(i, connectionProducer, queue);
readerThread[i] = new ReaderThread(i, connectionReader, queue);
threads[i] = producerThread[i];
threads[i + 20] = readerThread[i];
}
for (int i = 0; i < 40; i++)
{
threads[i].start();
}
for (int i = 0; i < 40; i++)
{
threads[i].join();
}
boolean hasFailure = false;
for (int i = 0; i < 40; i++)
{
if (!threads[i].exceptions.isEmpty())
{
hasFailure = true;
for (Exception element : threads[i].exceptions)
{
Exception ex = element;
log.error("Exception occurred in one of the threads - " + ex, ex);
}
}
}
int messagesProduced = 0;
int messagesRead = 0;
for (ProducerThread element : producerThread)
{
messagesProduced += element.messagesProduced;
}
for (int i = 0; i < producerThread.length; i++)
{
messagesRead += readerThread[i].messagesRead;
}
if (hasFailure)
{
ProxyAssertSupport.fail("An exception has occurred in one of the threads");
}
}
static class TestThread extends Thread
{
List<Exception> exceptions = new ArrayList<Exception>();
protected int index;
public int messageCount = 0;
}
static class ReaderThread extends TestThread
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
Connection conn;
Queue queue;
int messagesRead = 0;
public ReaderThread(final int index, final Connection conn, final Queue queue) throws Exception
{
this.index = index;
this.conn = conn;
this.queue = queue;
}
@Override
public void run()
{
int commitCounter = 0;
try
{
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(queue);
int lastCount = messageCount;
while (true)
{
TextMessage message = (TextMessage)consumer.receive(5000);
if (message == null)
{
break;
}
ReaderThread.log.debug("read message " + message.getText());
// alternating commits and rollbacks
if (commitCounter++ % 2 == 0)
{
messagesRead += messageCount - lastCount;
lastCount = messageCount;
ReaderThread.log.debug("commit");
session.commit();
}
else
{
lastCount = messageCount;
ReaderThread.log.debug("rollback");
session.rollback();
}
messageCount++;
if (messageCount % 7 == 0)
{
session.close();
session = conn.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(queue);
}
}
messagesRead += messageCount - lastCount;
session.commit();
consumer.close();
session.close();
}
catch (Exception e)
{
e.printStackTrace();
exceptions.add(e);
}
}
}
static class ProducerThread extends TestThread
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
Connection conn;
Queue queue;
int messagesProduced = 0;
public ProducerThread(final int index, final Connection conn, final Queue queue) throws Exception
{
this.index = index;
this.conn = conn;
this.queue = queue;
}
@Override
public void run()
{
for (int i = 0; i < 10; i++)
{
try
{
int lastMessage = messageCount;
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = sess.createProducer(queue);
for (int j = 0; j < 20; j++)
{
producer.send(sess.createTextMessage("Message " + i + ", " + j));
if (j % 2 == 0)
{
ProducerThread.log.debug("commit");
messagesProduced += messageCount - lastMessage;
lastMessage = messageCount;
sess.commit();
}
else
{
ProducerThread.log.debug("rollback");
lastMessage = messageCount;
sess.rollback();
}
messageCount++;
}
messagesProduced += messageCount - lastMessage;
sess.commit();
sess.close();
}
catch (Exception e)
{
e.printStackTrace();
exceptions.add(e);
}
}
}
}
}

View File

@ -1,164 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.InitialContext;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* A stress test written to investigate http://jira.jboss.org/jira/browse/JBMESSAGING-362
*
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
*/
public class CorruptMessageStressTest extends ActiveMQServerTestCase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
public static int PRODUCER_COUNT = 30;
public static int MESSAGE_COUNT = 10000;
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
private InitialContext ic;
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Test
public void testMultipleSenders() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Queue queue = (Queue)ic.lookup("/queue/StressTestQueue");
drainDestination(cf, queue);
Connection conn = cf.createConnection();
Session[] sessions = new Session[CorruptMessageStressTest.PRODUCER_COUNT];
MessageProducer[] producers = new MessageProducer[CorruptMessageStressTest.PRODUCER_COUNT];
for (int i = 0; i < CorruptMessageStressTest.PRODUCER_COUNT; i++)
{
sessions[i] = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producers[i] = sessions[i].createProducer(queue);
producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
Thread[] threads = new Thread[CorruptMessageStressTest.PRODUCER_COUNT];
for (int i = 0; i < CorruptMessageStressTest.PRODUCER_COUNT; i++)
{
threads[i] = new Thread(new Sender(sessions[i], producers[i]), "Sender Thread #" + i);
threads[i].start();
}
// wait for the threads to finish
for (int i = 0; i < CorruptMessageStressTest.PRODUCER_COUNT; i++)
{
threads[i].join();
}
conn.close();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
// ServerManagement.start("all");
ic = getInitialContext();
createQueue("StressTestQueue");
}
@Override
@After
public void tearDown() throws Exception
{
super.tearDown();
destroyQueue("StressTestQueue");
ic.close();
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
private class Sender implements Runnable
{
private final Session session;
private final MessageProducer producer;
private int count = 0;
public Sender(final Session session, final MessageProducer producer)
{
this.session = session;
this.producer = producer;
}
public void run()
{
while (true)
{
if (count == CorruptMessageStressTest.MESSAGE_COUNT)
{
break;
}
try
{
Message m = session.createMessage();
m.setStringProperty("XXX", "XXX-VALUE");
m.setStringProperty("YYY", "YYY-VALUE");
producer.send(m);
count++;
}
catch (Exception e)
{
log.error("Sender thread failed", e);
break;
}
}
}
}
}

View File

@ -1,106 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Topic;
import javax.jms.XASession;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.After;
import org.junit.BeforeClass;
/**
*
* Base class for stress tests
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public abstract class JMSStressTestBase extends ActiveMQServerTestCase
{
public static final boolean STRESS_TESTS_ENABLED = false;
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
protected static final int NUM_PERSISTENT_MESSAGES = 4000;
protected static final int NUM_NON_PERSISTENT_MESSAGES = 6000;
protected static final int NUM_PERSISTENT_PRESEND = 5000;
protected static final int NUM_NON_PERSISTENT_PRESEND = 3000;
protected ConnectionFactory cf;
protected Destination topic;
protected Destination destinationQueue1;
protected Destination destinationQueue2;
protected Destination destinationQueue3;
protected Destination destinationQueue4;
protected Topic topic1;
protected Topic topic2;
protected Topic topic3;
protected Topic topic4;
@Override
@After
public void tearDown() throws Exception
{
super.tearDown();
if (checkNoMessageData())
{
ProxyAssertSupport.fail("Message data still exists");
}
}
protected void runRunners(final Runner[] runners) throws Exception
{
Thread[] threads = new Thread[runners.length];
for (int i = 0; i < runners.length; i++)
{
threads[i] = new Thread(runners[i]);
threads[i].start();
}
for (int i = 0; i < runners.length; i++)
{
threads[i].join();
}
for (int i = 0; i < runners.length; i++)
{
if (runners[i].isFailed())
{
ProxyAssertSupport.fail("Runner " + i + " failed");
log.error("runner failed");
}
}
}
protected void tweakXASession(final XASession sess)
{
}
}

View File

@ -1,218 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Create 500 connections each with a consumer, consuming from a topic
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public class ManyConnectionsStressTest extends ActiveMQServerTestCase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
private static final int NUM_CONNECTIONS = 500;
private static final int NUM_MESSAGES = 100;
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
private InitialContext ic;
private volatile boolean failed;
private final Set<MyListener> listeners = new HashSet<MyListener>();
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
// ServerManagement.start("all");
ic = getInitialContext();
createTopic("StressTestTopic");
}
@Override
@After
public void tearDown() throws Exception
{
destroyTopic("StressTestTopic");
ic.close();
super.tearDown();
}
@Test
public void testManyConnections() throws Exception
{
ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
Topic topic = (Topic) ic.lookup("/topic/StressTestTopic");
Connection[] conns = new Connection[ManyConnectionsStressTest.NUM_CONNECTIONS];
for (int i = 0; i < ManyConnectionsStressTest.NUM_CONNECTIONS; i++)
{
conns[i] = addConnection(cf.createConnection());
Session sess = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(topic);
MyListener listener = new MyListener();
synchronized (listeners)
{
listeners.add(listener);
}
cons.setMessageListener(listener);
conns[i].start();
log.info("Created " + i);
}
// Thread.sleep(100 * 60 * 1000);
Connection connSend = addConnection(cf.createConnection());
Session sessSend = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessSend.createProducer(topic);
for (int i = 0; i < ManyConnectionsStressTest.NUM_MESSAGES; i++)
{
TextMessage tm = sessSend.createTextMessage("message" + i);
tm.setIntProperty("count", i);
prod.send(tm);
}
long wait = 30000;
synchronized (listeners)
{
while (!listeners.isEmpty() && wait > 0)
{
long start = System.currentTimeMillis();
try
{
listeners.wait(wait);
}
catch (InterruptedException e)
{
// Ignore
}
wait -= System.currentTimeMillis() - start;
}
}
if (wait <= 0)
{
ProxyAssertSupport.fail("Timed out");
}
ProxyAssertSupport.assertFalse(failed);
}
private void finished(final MyListener listener)
{
synchronized (listeners)
{
log.info("consumer " + listener + " has finished");
listeners.remove(listener);
listeners.notify();
}
}
private void failed(final MyListener listener)
{
synchronized (listeners)
{
log.error("consumer " + listener + " has failed");
listeners.remove(listener);
failed = true;
listeners.notify();
}
}
private final class MyListener implements MessageListener
{
public void onMessage(final Message msg)
{
try
{
int count = msg.getIntProperty("count");
// log.info(this + " got message " + msg);
if (count == ManyConnectionsStressTest.NUM_MESSAGES - 1)
{
finished(this);
}
}
catch (JMSException e)
{
log.error("Failed to get int property", e);
failed(this);
}
}
}
}

View File

@ -1,445 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.apache.activemq.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* A OpenCloseStressTest.
* <p>
* This stress test starts several publisher connections and several subscriber connections, then
* sends and consumes messages while concurrently closing the sessions.
* <p>
* This test will help catch race conditions that occurred with rapid open/closing of sessions when
* messages are being sent/received
* <p>
* E.g. http://jira.jboss.com/jira/browse/JBMESSAGING-982
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*/
public class OpenCloseStressTest extends ActiveMQServerTestCase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
InitialContext ic;
ConnectionFactory cf;
Topic topic;
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
// ServerManagement.start("all");
ic = getInitialContext();
cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
destroyTopic("TestTopic");
createTopic("TestTopic");
topic = (Topic)ic.lookup("topic/TestTopic");
log.debug("setup done");
}
@Override
@After
public void tearDown() throws Exception
{
destroyQueue("TestQueue");
log.debug("tear down done");
}
@Test
public void testOpenClose() throws Exception
{
Connection conn1 = null;
Connection conn2 = null;
Connection conn3 = null;
Connection conn4 = null;
Connection conn5 = null;
Connection conn6 = null;
Connection conn7 = null;
Connection conn8 = null;
try
{
Publisher[] publishers = new Publisher[3];
final int MSGS_PER_PUBLISHER = 10000;
conn1 = cf.createConnection();
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod1 = sess1.createProducer(topic);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
publishers[0] = new Publisher(sess1, prod1, MSGS_PER_PUBLISHER, 2);
conn2 = cf.createConnection();
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod2 = sess2.createProducer(topic);
prod2.setDeliveryMode(DeliveryMode.PERSISTENT);
publishers[1] = new Publisher(sess2, prod2, MSGS_PER_PUBLISHER, 5);
conn3 = cf.createConnection();
Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod3 = sess3.createProducer(topic);
prod3.setDeliveryMode(DeliveryMode.PERSISTENT);
publishers[2] = new Publisher(sess3, prod3, MSGS_PER_PUBLISHER, 1);
Subscriber[] subscribers = new Subscriber[5];
conn4 = cf.createConnection();
subscribers[0] = new Subscriber(conn4, 3 * MSGS_PER_PUBLISHER, 500, 1000 * 60 * 15, topic, false);
conn5 = cf.createConnection();
subscribers[1] = new Subscriber(conn5, 3 * MSGS_PER_PUBLISHER, 2000, 1000 * 60 * 15, topic, false);
conn6 = cf.createConnection();
subscribers[2] = new Subscriber(conn6, 3 * MSGS_PER_PUBLISHER, 700, 1000 * 60 * 15, topic, false);
conn7 = cf.createConnection();
subscribers[3] = new Subscriber(conn7, 3 * MSGS_PER_PUBLISHER, 1500, 1000 * 60 * 15, topic, true);
conn8 = cf.createConnection();
subscribers[4] = new Subscriber(conn8, 3 * MSGS_PER_PUBLISHER, 1200, 1000 * 60 * 15, topic, true);
Thread[] threads = new Thread[8];
// subscribers
threads[0] = new Thread(subscribers[0]);
threads[1] = new Thread(subscribers[1]);
threads[2] = new Thread(subscribers[2]);
threads[3] = new Thread(subscribers[3]);
threads[4] = new Thread(subscribers[4]);
// publishers
threads[5] = new Thread(publishers[0]);
threads[6] = new Thread(publishers[1]);
threads[7] = new Thread(publishers[2]);
for (int i = 0; i < subscribers.length; i++)
{
threads[i].start();
}
// Pause before creating producers otherwise subscribers to make sure they're all created
Thread.sleep(5000);
for (int i = subscribers.length; i < threads.length; i++)
{
threads[i].start();
}
for (Thread thread : threads)
{
thread.join();
}
for (Subscriber subscriber : subscribers)
{
if (subscriber.isDurable())
{
ProxyAssertSupport.assertEquals(3 * MSGS_PER_PUBLISHER, subscriber.getMessagesReceived());
}
else
{
// Note that for a non durable subscriber the number of messages received in total
// will be somewhat less than the total number received since when recycling the session
// there is a period of time after closing the previous session and starting the next one
// when messages are being sent and won't be received (since there is no consumer)
}
ProxyAssertSupport.assertFalse(subscriber.isFailed());
}
for (Publisher publisher : publishers)
{
ProxyAssertSupport.assertFalse(publisher.isFailed());
}
}
finally
{
if (conn1 != null)
{
conn1.close();
}
if (conn2 != null)
{
conn2.close();
}
if (conn3 != null)
{
conn3.close();
}
if (conn4 != null)
{
conn4.close();
}
if (conn5 != null)
{
conn5.close();
}
if (conn6 != null)
{
conn6.close();
}
if (conn7 != null)
{
conn7.close();
}
if (conn8 != null)
{
conn8.close();
}
}
}
class Publisher implements Runnable
{
private final Session sess;
private final int numMessages;
private final int delay;
private final MessageProducer prod;
private boolean failed;
boolean isFailed()
{
return failed;
}
Publisher(final Session sess, final MessageProducer prod, final int numMessages, final int delay)
{
this.sess = sess;
this.prod = prod;
this.numMessages = numMessages;
this.delay = delay;
}
public void run()
{
try
{
for (int i = 0; i < numMessages; i++)
{
TextMessage tm = sess.createTextMessage("message" + i);
prod.send(tm);
try
{
Thread.sleep(delay);
}
catch (Exception ignore)
{
}
}
}
catch (JMSException e)
{
log.error("Failed to send message", e);
failed = true;
}
}
}
class Subscriber implements Runnable
{
private Session sess;
private MessageConsumer cons;
private int msgsReceived;
private final int numMessages;
private final int delay;
private final Connection conn;
private boolean failed;
private final long timeout;
private final Destination dest;
private final boolean durable;
private String subname;
boolean isFailed()
{
return failed;
}
boolean isDurable()
{
return durable;
}
synchronized void msgReceived()
{
msgsReceived++;
}
synchronized int getMessagesReceived()
{
return msgsReceived;
}
class Listener implements MessageListener
{
public void onMessage(final Message msg)
{
msgReceived();
}
}
Subscriber(final Connection conn,
final int numMessages,
final int delay,
final long timeout,
final Destination dest,
final boolean durable) throws Exception
{
this.conn = conn;
this.numMessages = numMessages;
this.delay = delay;
this.timeout = timeout;
this.dest = dest;
this.durable = durable;
if (durable)
{
conn.setClientID(UUIDGenerator.getInstance().generateStringUUID());
subname = UUIDGenerator.getInstance().generateStringUUID();
}
}
public void run()
{
try
{
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout && msgsReceived < numMessages)
{
// recycle the session
recycleSession();
Thread.sleep(delay);
}
// Delete the durable sub
if (durable)
{
recycleSession();
cons.close();
sess.unsubscribe(subname);
}
}
catch (Exception e)
{
log.error("Failed in subscriber", e);
failed = true;
}
}
void recycleSession() throws Exception
{
conn.stop();
if (sess != null)
{
sess.close();
}
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (durable)
{
cons = sess.createDurableSubscriber((Topic)dest, subname);
}
else
{
cons = sess.createConsumer(dest);
}
cons.setMessageListener(new Listener());
conn.start();
}
}
}

View File

@ -1,271 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* A QueueStressTest.
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision: 2349 $</tt>
*/
public class QueueStressTest extends JMSStressTestBase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
/*
* Stress a queue with transational, non transactional and 2pc senders sending both persistent
* and non persistent messages
* Transactional senders go through a cycle of sending and rolling back
*
*/
@Test
public void testQueueMultipleSenders() throws Exception
{
Connection conn1 = cf.createConnection();
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess4 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess5 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess6 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess7 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess8 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess9 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess10 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess11 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess12 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess13 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess14 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess15 = conn1.createSession(true, Session.SESSION_TRANSACTED);
Session sess16 = conn1.createSession(true, Session.SESSION_TRANSACTED);
XASession xaSess1 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess1);
XASession xaSess2 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess2);
XASession xaSess3 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess3);
XASession xaSess4 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess4);
XASession xaSess5 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess5);
XASession xaSess6 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess6);
XASession xaSess7 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess7);
XASession xaSess8 = ((XAConnection) conn1).createXASession();
tweakXASession(xaSess8);
Session sess17 = xaSess1.getSession();
Session sess18 = xaSess2.getSession();
Session sess19 = xaSess3.getSession();
Session sess20 = xaSess4.getSession();
Session sess21 = xaSess5.getSession();
Session sess22 = xaSess6.getSession();
Session sess23 = xaSess7.getSession();
Session sess24 = xaSess8.getSession();
MessageProducer prod1 = sess1.createProducer(destinationQueue1);
prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod2 = sess2.createProducer(destinationQueue1);
prod2.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod3 = sess3.createProducer(destinationQueue1);
prod3.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod4 = sess4.createProducer(destinationQueue1);
prod4.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod5 = sess5.createProducer(destinationQueue1);
prod5.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod6 = sess6.createProducer(destinationQueue1);
prod6.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod7 = sess7.createProducer(destinationQueue1);
prod7.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod8 = sess8.createProducer(destinationQueue1);
prod8.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod9 = sess9.createProducer(destinationQueue1);
prod9.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod10 = sess10.createProducer(destinationQueue1);
prod10.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod11 = sess11.createProducer(destinationQueue1);
prod11.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod12 = sess12.createProducer(destinationQueue1);
prod12.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod13 = sess13.createProducer(destinationQueue1);
prod13.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod14 = sess14.createProducer(destinationQueue1);
prod14.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod15 = sess15.createProducer(destinationQueue1);
prod15.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod16 = sess16.createProducer(destinationQueue1);
prod16.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod17 = sess17.createProducer(destinationQueue1);
prod17.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod18 = sess18.createProducer(destinationQueue1);
prod18.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod19 = sess19.createProducer(destinationQueue1);
prod19.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod20 = sess20.createProducer(destinationQueue1);
prod20.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod21 = sess21.createProducer(destinationQueue1);
prod21.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod22 = sess22.createProducer(destinationQueue1);
prod22.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageProducer prod23 = sess23.createProducer(destinationQueue1);
prod23.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod24 = sess24.createProducer(destinationQueue1);
prod24.setDeliveryMode(DeliveryMode.PERSISTENT);
Connection conn2 = cf.createConnection();
conn2.start();
Session sessReceive = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sessReceive.createConsumer(destinationQueue1);
Runner[] runners = new Runner[]{
new Sender("prod1", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
new Sender("prod2", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
new Sender("prod3", sess3, prod3, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
new Sender("prod4", sess4, prod4, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
new Sender("prod5", sess5, prod5, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
new Sender("prod6", sess6, prod6, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
new Sender("prod7", sess7, prod7, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
new Sender("prod8", sess8, prod8, JMSStressTestBase.NUM_PERSISTENT_MESSAGES),
new TransactionalSender("prod9",
sess9,
prod9,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
1,
1),
new TransactionalSender("prod10",
sess10,
prod10,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
1,
1),
new TransactionalSender("prod11",
sess11,
prod11,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
10,
7),
new TransactionalSender("prod12",
sess12,
prod12,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
10,
7),
new TransactionalSender("prod13",
sess13,
prod13,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
50,
21),
new TransactionalSender("prod14",
sess14,
prod14,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
50,
21),
new TransactionalSender("prod15",
sess15,
prod15,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
100,
67),
new TransactionalSender("prod16",
sess16,
prod16,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
100,
67),
new Transactional2PCSender("prod17",
xaSess1,
prod17,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
1,
1),
new Transactional2PCSender("prod18",
xaSess2,
prod18,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
1,
1),
new Transactional2PCSender("prod19",
xaSess3,
prod19,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
10,
7),
new Transactional2PCSender("prod20",
xaSess4,
prod20,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
10,
7),
new Transactional2PCSender("prod21",
xaSess5,
prod21,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
50,
21),
new Transactional2PCSender("prod22",
xaSess6,
prod22,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
50,
21),
new Transactional2PCSender("prod23",
xaSess7,
prod23,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES,
100,
67),
new Transactional2PCSender("prod24",
xaSess8,
prod24,
JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
100,
67),
new Receiver(sessReceive,
cons,
12 * JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + 12 * JMSStressTestBase.NUM_PERSISTENT_MESSAGES,
false)};
runRunners(runners);
conn1.close();
conn2.close();
}
}

View File

@ -1,297 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import org.apache.activemq.jms.tests.JmsTestLogger;
/**
* Receives messages from a destination for stress testing
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*/
public class Receiver extends Runner implements MessageListener
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
private static final long RECEIVE_TIMEOUT = 120000;
protected MessageConsumer cons;
protected int count;
protected boolean isListener;
protected Map<String, Object> counts = new HashMap<String, Object>();
protected boolean isCC;
protected Connection conn;
protected ConnectionConsumer cc;
private final Object lock1 = new Object();
private final Object lock2 = new Object();
private Message theMessage;
private boolean finished;
public Receiver(final Connection conn, final Session sess, final int numMessages, final Destination dest) throws Exception
{
super(sess, numMessages);
isListener = true;
isCC = true;
sess.setMessageListener(this);
cc = conn.createConnectionConsumer(dest, null, new MockServerSessionPool(sess), 10);
}
public Receiver(final Session sess, final MessageConsumer cons, final int numMessages, final boolean isListener) throws Exception
{
super(sess, numMessages);
this.cons = cons;
this.isListener = isListener;
if (this.isListener)
{
cons.setMessageListener(this);
}
}
private boolean done;
public void onMessage(final Message m)
{
try
{
synchronized (lock1)
{
theMessage = m;
lock1.notify();
}
// Wait for message to be processed
synchronized (lock2)
{
while (!done && !finished)
{
lock2.wait();
}
done = false;
}
}
catch (Exception e)
{
Receiver.log.error("Failed to put in channel", e);
setFailed(true);
}
}
protected void finished()
{
synchronized (lock2)
{
finished = true;
lock2.notify();
}
}
protected Message getMessage() throws Exception
{
Message m;
if (isListener)
{
synchronized (lock1)
{
long start = System.currentTimeMillis();
long waitTime = Receiver.RECEIVE_TIMEOUT;
while (theMessage == null && waitTime >= 0)
{
lock1.wait(waitTime);
waitTime = Receiver.RECEIVE_TIMEOUT - (System.currentTimeMillis() - start);
}
m = theMessage;
theMessage = null;
}
}
else
{
m = cons.receive(Receiver.RECEIVE_TIMEOUT);
}
return m;
}
protected void processingDone()
{
if (isListener)
{
synchronized (lock2)
{
done = true;
lock2.notify();
}
}
}
@Override
public void run()
{
// Small pause so as not to miss any messages in a topic
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
}
try
{
String prodName = null;
Integer msgCount = null;
while (count < numMessages)
{
Message m = getMessage();
if (m == null)
{
Receiver.log.error("Message is null");
setFailed(true);
processingDone();
return;
}
prodName = m.getStringProperty("PROD_NAME");
msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
// log.info(this + " Got message " + prodName + ":" + msgCount + "M: " + m.getJMSMessageID());
Integer prevCount = (Integer)counts.get(prodName);
if (prevCount == null)
{
if (msgCount.intValue() != 0)
{
Receiver.log.error("First message received not zero");
setFailed(true);
processingDone();
return;
}
}
else
{
if (prevCount.intValue() != msgCount.intValue() - 1)
{
Receiver.log.error("Message out of sequence for " + prodName +
", expected:" +
(prevCount.intValue() + 1) +
" got " +
msgCount);
setFailed(true);
processingDone();
return;
}
}
counts.put(prodName, msgCount);
count++;
processingDone();
}
}
catch (Exception e)
{
Receiver.log.error("Failed to receive message", e);
setFailed(true);
}
finally
{
if (cc != null)
{
try
{
cc.close();
}
catch (JMSException e)
{
Receiver.log.error("Failed to close connection consumer", e);
}
}
}
}
static final class MockServerSessionPool implements ServerSessionPool
{
private final ServerSession serverSession;
MockServerSessionPool(final Session sess)
{
serverSession = new MockServerSession(sess);
}
public ServerSession getServerSession() throws JMSException
{
return serverSession;
}
}
static final class MockServerSession implements ServerSession
{
Session session;
MockServerSession(final Session sess)
{
session = sess;
}
public Session getSession() throws JMSException
{
return session;
}
public void start() throws JMSException
{
session.run();
}
}
}

View File

@ -1,195 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import org.apache.activemq.jms.tests.JmsTestLogger;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
/**
*
* A RecoveringReceiver.
*
* A Receiver that receives messages from a destination and alternately
* acknowledges and recovers the session.
* Must be used with ack mode CLIENT_ACKNOWLEDGE
*
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public class RecoveringReceiver extends Receiver
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
protected int ackSize;
protected int recoverSize;
class Count
{
int lastAcked;
int lastReceived;
}
public RecoveringReceiver(final Session sess,
final MessageConsumer cons,
final int numMessages,
final int ackSize,
final int recoverSize,
final boolean isListener) throws Exception
{
super(sess, cons, numMessages, isListener);
this.ackSize = ackSize;
this.recoverSize = recoverSize;
}
@Override
public void run()
{
// Small pause so as not to miss any messages in a topic
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
}
try
{
int iterations = numMessages / ackSize;
for (int outerCount = 0; outerCount < iterations; outerCount++)
{
Message m = null;
for (int innerCount = 0; innerCount < ackSize; innerCount++)
{
m = getMessage();
if (m == null)
{
RecoveringReceiver.log.error("Message is null");
setFailed(true);
return;
}
String prodName = m.getStringProperty("PROD_NAME");
Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
Count count = (Count)counts.get(prodName);
if (count == null)
{
// First time
if (msgCount.intValue() != 0)
{
RecoveringReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
setFailed(true);
return;
}
else
{
count = new Count();
counts.put(prodName, count);
}
}
else
{
if (count.lastAcked != msgCount.intValue() - 1)
{
RecoveringReceiver.log.error("Message out of sequence for " + prodName +
", expected " +
(count.lastAcked + 1));
setFailed(true);
return;
}
}
count.lastAcked = msgCount.intValue();
count.lastReceived = msgCount.intValue();
if (innerCount == ackSize - 1)
{
m.acknowledge();
}
processingDone();
}
if (outerCount == iterations - 1)
{
break;
}
for (int innerCount = 0; innerCount < recoverSize; innerCount++)
{
m = getMessage();
if (m == null)
{
RecoveringReceiver.log.error("Message is null");
return;
}
String prodName = m.getStringProperty("PROD_NAME");
Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
Count count = (Count)counts.get(prodName);
if (count == null)
{
// First time
if (msgCount.intValue() != 0)
{
RecoveringReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
setFailed(true);
return;
}
else
{
count = new Count();
count.lastAcked = -1;
counts.put(prodName, count);
}
}
else
{
if (count.lastReceived != msgCount.intValue() - 1)
{
RecoveringReceiver.log.error("Message out of sequence");
setFailed(true);
return;
}
}
count.lastReceived = msgCount.intValue();
if (innerCount == recoverSize - 1)
{
sess.recover();
}
processingDone();
}
}
}
catch (Exception e)
{
RecoveringReceiver.log.error("Failed to receive message", e);
setFailed(true);
}
}
}

View File

@ -1,256 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.InitialContext;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.apache.activemq.jms.tests.JmsTestLogger;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Send messages to a topic with selector1, consumer them with multiple consumers and relay them
* back to the topic with a different selector, then consume that with more consumers.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
public class RelayStressTest extends ActiveMQServerTestCase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
// Constants -----------------------------------------------------
private static JmsTestLogger log = JmsTestLogger.LOGGER;
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
private InitialContext ic;
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
// ServerManagement.start("all");
ic = getInitialContext();
createTopic("StressTestTopic");
RelayStressTest.log.debug("setup done");
}
@Override
@After
public void tearDown() throws Exception
{
destroyTopic("StressTestTopic");
ic.close();
}
@Test
public void testRelay() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Topic topic = (Topic)ic.lookup("/topic/StressTestTopic");
final int numMessages = 20000;
final int numRelayers = 5;
final int numConsumers = 20;
Connection conn = cf.createConnection();
class Relayer implements MessageListener
{
boolean done;
boolean failed;
int count;
MessageProducer prod;
Relayer(final MessageProducer prod)
{
this.prod = prod;
}
public void onMessage(final Message m)
{
try
{
m.clearProperties();
m.setStringProperty("name", "Tim");
prod.send(m);
count++;
if (count == numMessages)
{
synchronized (this)
{
done = true;
notify();
}
}
}
catch (JMSException e)
{
e.printStackTrace();
synchronized (this)
{
done = true;
failed = true;
notify();
}
}
}
}
class Consumer implements MessageListener
{
boolean failed;
boolean done;
int count;
public void onMessage(final Message m)
{
count++;
if (count == numMessages * numRelayers)
{
synchronized (this)
{
done = true;
notify();
}
}
}
}
Relayer[] relayers = new Relayer[numRelayers];
Consumer[] consumers = new Consumer[numConsumers];
for (int i = 0; i < numRelayers; i++)
{
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(topic, "name = 'Watt'");
// MessageConsumer cons = sess.createConsumer(topic);
MessageProducer prod = sess.createProducer(topic);
relayers[i] = new Relayer(prod);
cons.setMessageListener(relayers[i]);
}
for (int i = 0; i < numConsumers; i++)
{
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(topic, "name = 'Tim'");
consumers[i] = new Consumer();
cons.setMessageListener(consumers[i]);
}
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(topic);
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < numMessages; i++)
{
Message m = sess.createMessage();
m.setStringProperty("name", "Watt");
prod.send(m);
}
for (int i = 0; i < numRelayers; i++)
{
synchronized (relayers[i])
{
if (!relayers[i].done)
{
relayers[i].wait();
}
}
}
for (int i = 0; i < numConsumers; i++)
{
synchronized (consumers[i])
{
if (!consumers[i].done)
{
consumers[i].wait();
}
}
}
conn.close();
for (int i = 0; i < numRelayers; i++)
{
ProxyAssertSupport.assertFalse(relayers[i].failed);
}
for (int i = 0; i < numConsumers; i++)
{
ProxyAssertSupport.assertFalse(consumers[i].failed);
}
}
}

View File

@ -1,64 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import org.apache.activemq.jms.tests.JmsTestLogger;
import javax.jms.Session;
/**
*
* A Runner.
*
* Base class for running components of a stress test
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public abstract class Runner implements Runnable
{
protected JmsTestLogger log = JmsTestLogger.LOGGER;
protected Session sess;
protected int numMessages;
private boolean failed;
public Runner(final Session sess, final int numMessages)
{
this.sess = sess;
this.numMessages = numMessages;
}
public abstract void run();
public boolean isFailed()
{
return failed;
}
public void setFailed(final boolean failed)
{
this.failed = failed;
if (failed)
{
log.info("Marking Runner " + this + " as failed", new Exception("trace"));
}
}
}

View File

@ -1,72 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import org.apache.activemq.jms.tests.JmsTestLogger;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
/**
*
* A Sender.
*
* Sends messages to a destination, used in stress testing
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public class Sender extends Runner
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
protected MessageProducer prod;
protected String prodName;
protected int count;
public Sender(final String prodName, final Session sess, final MessageProducer prod, final int numMessages)
{
super(sess, numMessages);
this.prod = prod;
this.prodName = prodName;
}
@Override
public void run()
{
try
{
while (count < numMessages)
{
Message m = sess.createMessage();
m.setStringProperty("PROD_NAME", prodName);
m.setIntProperty("MSG_NUMBER", count);
prod.send(m);
count++;
}
}
catch (Exception e)
{
Sender.log.error("Failed to send message", e);
setFailed(true);
}
}
}

View File

@ -1,547 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import org.apache.activemq.jms.tests.ActiveMQServerTestCase;
import org.apache.activemq.jms.tests.JmsTestLogger;
import org.apache.activemq.jms.tests.util.ProxyAssertSupport;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* In order for this test to run, you will need to edit /etc/security/limits.conf and change your max sockets to something bigger than 1024
*
* It's required to re-login after this change.
*
* For Windows you need also to increase this limit (max opened files) somehow.
*
*
Example of /etc/security/limits.confg:
#<domain> <type> <item> <value>
clebert hard nofile 10240
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class SeveralClientsStressTest extends ActiveMQServerTestCase
{
@BeforeClass
public static void stressTestsEnabled()
{
org.junit.Assume.assumeTrue(JMSStressTestBase.STRESS_TESTS_ENABLED);
}
protected boolean info = false;
protected boolean startServer = true;
// Static ---------------------------------------------------------------------------------------
protected static long PRODUCER_ALIVE_FOR = 60000; // one minute
protected static long CONSUMER_ALIVE_FOR = 60000; // one minutes
protected static long TEST_ALIVE_FOR = 5 * 60 * 1000; // 5 minutes
protected static int NUMBER_OF_PRODUCERS = 100; // this should be set to 300 later
protected static int NUMBER_OF_CONSUMERS = 100; // this should be set to 300 later
// a producer should have a long wait between each message sent?
protected static boolean LONG_WAIT_ON_PRODUCERS = false;
protected static AtomicInteger producedMessages = new AtomicInteger(0);
protected static AtomicInteger readMessages = new AtomicInteger(0);
protected Context createContext() throws Exception
{
return getInitialContext();
}
// Constructors ---------------------------------------------------------------------------------
// Public ---------------------------------------------------------------------------------------
@Test
public void testQueue() throws Exception
{
Context ctx = createContext();
HashSet<SeveralClientsStressTest.Worker> threads = new HashSet<SeveralClientsStressTest.Worker>();
// A chhanel of communication between workers and the test method
LinkedBlockingQueue<InternalMessage> testChannel = new LinkedBlockingQueue<InternalMessage>();
for (int i = 0; i < SeveralClientsStressTest.NUMBER_OF_PRODUCERS; i++)
{
threads.add(new SeveralClientsStressTest.Producer(i, testChannel));
}
for (int i = 0; i < SeveralClientsStressTest.NUMBER_OF_CONSUMERS; i++)
{
threads.add(new SeveralClientsStressTest.Consumer(i, testChannel));
}
for (Worker worker : threads)
{
worker.start();
}
long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.TEST_ALIVE_FOR;
int numberOfProducers = SeveralClientsStressTest.NUMBER_OF_PRODUCERS;
int numberOfConsumers = SeveralClientsStressTest.NUMBER_OF_CONSUMERS;
while (threads.size() > 0)
{
SeveralClientsStressTest.InternalMessage msg = testChannel.poll(2000,
TimeUnit.MILLISECONDS);
log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() +
" and Consumed:" +
SeveralClientsStressTest.readMessages.get() +
" messages");
if (msg != null)
{
if (info)
{
log.info("Received message " + msg);
}
if (msg instanceof SeveralClientsStressTest.WorkerFailed)
{
ProxyAssertSupport.fail("Worker " + msg.getWorker() + " has failed");
}
else if (msg instanceof SeveralClientsStressTest.WorkedFinishedMessages)
{
SeveralClientsStressTest.WorkedFinishedMessages finished = (SeveralClientsStressTest.WorkedFinishedMessages)msg;
if (threads.remove(finished.getWorker()))
{
if (System.currentTimeMillis() < timeToFinish)
{
if (finished.getWorker() instanceof SeveralClientsStressTest.Producer)
{
if (info)
{
log.info("Scheduling new Producer " + numberOfProducers);
}
SeveralClientsStressTest.Producer producer = new SeveralClientsStressTest.Producer(numberOfProducers++,
testChannel);
threads.add(producer);
producer.start();
}
else if (finished.getWorker() instanceof SeveralClientsStressTest.Consumer)
{
if (info)
{
log.info("Scheduling new ClientConsumer " + numberOfConsumers);
}
SeveralClientsStressTest.Consumer consumer = new SeveralClientsStressTest.Consumer(numberOfConsumers++,
testChannel);
threads.add(consumer);
consumer.start();
}
}
}
else
{
log.warn(finished.getWorker() + " was not available on threads HashSet");
}
}
}
}
log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() +
" and Consumed:" +
SeveralClientsStressTest.readMessages.get() +
" messages");
clearMessages();
log.info("Produced:" + SeveralClientsStressTest.producedMessages.get() +
" and Consumed:" +
SeveralClientsStressTest.readMessages.get() +
" messages");
ProxyAssertSupport.assertEquals(SeveralClientsStressTest.producedMessages.get(),
SeveralClientsStressTest.readMessages.get());
}
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
protected void clearMessages() throws Exception
{
Context ctx = createContext();
ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory");
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)ctx.lookup("queue/testQueue");
MessageConsumer consumer = sess.createConsumer(queue);
conn.start();
while (consumer.receive(1000) != null)
{
SeveralClientsStressTest.readMessages.incrementAndGet();
log.info("Received JMS message on clearMessages");
}
conn.close();
}
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
if (startServer)
{
// ServerManagement.start("all", true);
createQueue("testQueue");
}
clearMessages();
SeveralClientsStressTest.producedMessages = new AtomicInteger(0);
SeveralClientsStressTest.readMessages = new AtomicInteger(0);
}
// Private --------------------------------------------------------------------------------------
// Inner classes --------------------------------------------------------------------------------
private class Worker extends Thread
{
protected JmsTestLogger log = JmsTestLogger.LOGGER;
private boolean failed = false;
private final int workerId;
private Exception ex;
LinkedBlockingQueue<InternalMessage> messageQueue;
public int getWorkerId()
{
return workerId;
}
public Exception getException()
{
return ex;
}
public boolean isFailed()
{
return failed;
}
protected synchronized void setFailed(final boolean failed, final Exception ex)
{
this.failed = failed;
this.ex = ex;
log.info("Sending Exception", ex);
sendInternalMessage(new SeveralClientsStressTest.WorkerFailed(this));
}
protected void sendInternalMessage(final SeveralClientsStressTest.InternalMessage msg)
{
if (info)
{
log.info("Sending message " + msg);
}
try
{
messageQueue.put(msg);
}
catch (Exception e)
{
log.error(e, e);
setFailed(true, e);
}
}
public Worker(final String name, final int workerId,
final LinkedBlockingQueue<SeveralClientsStressTest.InternalMessage> messageQueue)
{
super(name);
this.workerId = workerId;
this.messageQueue = messageQueue;
setDaemon(true);
}
@Override
public String toString()
{
return this.getClass().getName() + ":" + getWorkerId();
}
}
final class Producer extends SeveralClientsStressTest.Worker
{
public Producer(final int producerId,
final LinkedBlockingQueue<SeveralClientsStressTest.InternalMessage> messageQueue)
{
super("Producer:" + producerId, producerId, messageQueue);
}
Random random = new Random();
@Override
public void run()
{
try
{
Context ctx = createContext();
ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory");
Queue queue = (Queue)ctx.lookup("queue/testQueue");
if (info)
{
log.info("Creating connection and producer");
}
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
if (getWorkerId() % 2 == 0)
{
if (info)
{
log.info("Non Persistent Producer was created");
}
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
else
{
if (info)
{
log.info("Persistent Producer was created");
}
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
}
long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.PRODUCER_ALIVE_FOR;
try
{
int messageSent = 0;
while (System.currentTimeMillis() < timeToFinish)
{
prod.send(sess.createTextMessage("Message sent at " + System.currentTimeMillis()));
SeveralClientsStressTest.producedMessages.incrementAndGet();
messageSent++;
if (messageSent % 50 == 0)
{
if (info)
{
log.info("Sent " + messageSent + " Messages");
}
}
if (SeveralClientsStressTest.LONG_WAIT_ON_PRODUCERS)
{
int waitTime = random.nextInt() % 2 + 1;
if (waitTime < 0)
{
waitTime *= -1;
}
Thread.sleep(waitTime * 1000); // wait 1 or 2 seconds
}
else
{
Thread.sleep(100);
}
}
sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
}
finally
{
conn.close();
}
}
catch (Exception e)
{
log.error(e, e);
setFailed(true, e);
}
}
}
final class Consumer extends SeveralClientsStressTest.Worker
{
public Consumer(final int consumerId,
final LinkedBlockingQueue<SeveralClientsStressTest.InternalMessage> messageQueue)
{
super("ClientConsumer:" + consumerId, consumerId, messageQueue);
}
@Override
public void run()
{
try
{
Context ctx = createContext();
ConnectionFactory cf = (ConnectionFactory)ctx.lookup("/ClusteredConnectionFactory");
Queue queue = (Queue)ctx.lookup("queue/testQueue");
if (info)
{
log.info("Creating connection and consumer");
}
Connection conn = cf.createConnection();
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = sess.createConsumer(queue);
if (info)
{
log.info("ClientConsumer was created");
}
conn.start();
int msgs = 0;
int transactions = 0;
long timeToFinish = System.currentTimeMillis() + SeveralClientsStressTest.CONSUMER_ALIVE_FOR;
try
{
while (System.currentTimeMillis() < timeToFinish)
{
Message msg = consumer.receive(1000);
if (msg != null)
{
msgs++;
if (msgs >= 50)
{
transactions++;
if (transactions % 2 == 0)
{
if (info)
{
log.info("Commit transaction");
}
sess.commit();
SeveralClientsStressTest.readMessages.addAndGet(msgs);
}
else
{
if (info)
{
log.info("Rollback transaction");
}
sess.rollback();
}
msgs = 0;
}
}
else
{
break;
}
}
SeveralClientsStressTest.readMessages.addAndGet(msgs);
sess.commit();
sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
}
finally
{
conn.close();
}
}
catch (Exception e)
{
log.error(e);
setFailed(true, e);
}
}
}
// Objects used on the communication between Workers and the test
static class InternalMessage
{
SeveralClientsStressTest.Worker worker;
public InternalMessage(final SeveralClientsStressTest.Worker worker)
{
this.worker = worker;
}
public SeveralClientsStressTest.Worker getWorker()
{
return worker;
}
@Override
public String toString()
{
return this.getClass().getName() + " worker-> " + worker.toString();
}
}
static class WorkedFinishedMessages extends SeveralClientsStressTest.InternalMessage
{
public WorkedFinishedMessages(final SeveralClientsStressTest.Worker worker)
{
super(worker);
}
}
static class WorkerFailed extends SeveralClientsStressTest.InternalMessage
{
public WorkerFailed(final SeveralClientsStressTest.Worker worker)
{
super(worker);
}
}
}

View File

@ -1,297 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import org.junit.Test;
/**
* A TopicStressTest.
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision: 2349 $</tt>
*/
public class TopicStressTest extends JMSStressTestBase
{
/*
* Stress a topic with with many non transactional, transactional and 2pc receivers.
* Non transactional receivers use ack modes of auto, dups and client ack.
* Client ack receivers go through a cycle of receiving a batch, acking and recovering
* Transactional receivers go through a cycle of receiving committing and rolling back.
* Half the consumers are durable and half non durable.
*
*/
@Test
public void testTopicMultipleReceivers() throws Exception
{
Connection conn1 = cf.createConnection();
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod1 = sess1.createProducer(topic1);
prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageProducer prod2 = sess2.createProducer(topic1);
prod2.setDeliveryMode(DeliveryMode.PERSISTENT);
Connection conn2 = cf.createConnection();
conn2.setClientID("clientid1");
conn2.start();
// 4 auto ack
Session rsess1 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session rsess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session rsess3 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session rsess4 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 dups
Session rsess5 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Session rsess6 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Session rsess7 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Session rsess8 = conn2.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
// 4 client
Session rsess9 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session rsess10 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session rsess11 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session rsess12 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 4 transactional
Session rsess13 = conn2.createSession(true, Session.SESSION_TRANSACTED);
Session rsess14 = conn2.createSession(true, Session.SESSION_TRANSACTED);
Session rsess15 = conn2.createSession(true, Session.SESSION_TRANSACTED);
Session rsess16 = conn2.createSession(true, Session.SESSION_TRANSACTED);
// 4 2pc transactional
XASession rxaSess1 = ((XAConnection) conn2).createXASession();
tweakXASession(rxaSess1);
XASession rxaSess2 = ((XAConnection) conn2).createXASession();
tweakXASession(rxaSess2);
XASession rxaSess3 = ((XAConnection) conn2).createXASession();
tweakXASession(rxaSess3);
XASession rxaSess4 = ((XAConnection) conn2).createXASession();
tweakXASession(rxaSess4);
Session rsess17 = rxaSess1.getSession();
Session rsess18 = rxaSess2.getSession();
Session rsess19 = rxaSess3.getSession();
Session rsess20 = rxaSess4.getSession();
MessageConsumer cons1 = rsess1.createConsumer(topic1);
MessageConsumer cons2 = rsess2.createDurableSubscriber(topic1, "sub1");
MessageConsumer cons3 = rsess3.createConsumer(topic1);
MessageConsumer cons4 = rsess4.createDurableSubscriber(topic1, "sub2");
MessageConsumer cons5 = rsess5.createConsumer(topic1);
MessageConsumer cons6 = rsess6.createDurableSubscriber(topic1, "sub3");
MessageConsumer cons7 = rsess7.createConsumer(topic1);
MessageConsumer cons8 = rsess8.createDurableSubscriber(topic1, "sub4");
MessageConsumer cons9 = rsess9.createConsumer(topic1);
MessageConsumer cons10 = rsess10.createDurableSubscriber(topic1, "sub5");
MessageConsumer cons11 = rsess11.createConsumer(topic1);
MessageConsumer cons12 = rsess12.createDurableSubscriber(topic1, "sub6");
MessageConsumer cons13 = rsess13.createConsumer(topic1);
MessageConsumer cons14 = rsess14.createDurableSubscriber(topic1, "sub7");
MessageConsumer cons15 = rsess15.createConsumer(topic1);
MessageConsumer cons16 = rsess16.createDurableSubscriber(topic1, "sub8");
MessageConsumer cons17 = rsess17.createConsumer(topic1);
MessageConsumer cons18 = rsess18.createDurableSubscriber(topic1, "sub9");
MessageConsumer cons19 = rsess19.createConsumer(topic1);
MessageConsumer cons20 = rsess20.createDurableSubscriber(topic1, "sub10");
// To make sure paging occurs first send some messages before receiving
Runner[] runners = new Runner[]{
new Sender("prod1", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND),
new Sender("prod2", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_PRESEND)};
runRunners(runners);
runners = new Runner[]{
// 4 auto ack
new Receiver(rsess1,
cons1,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
false),
new Receiver(rsess2,
cons2,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
true),
new Receiver(rsess3,
cons3,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
false),
new Receiver(rsess4,
cons4,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
true),
// 4 dups ok
new Receiver(rsess5,
cons5,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
false),
new Receiver(rsess6,
cons6,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
true),
new Receiver(rsess7,
cons7,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
false),
new Receiver(rsess8,
cons8,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
true),
// 4 client ack
new RecoveringReceiver(rsess9,
cons9,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
1,
1,
false),
new RecoveringReceiver(rsess10,
cons10,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
10,
7,
true),
new RecoveringReceiver(rsess11,
cons11,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
50,
21,
false),
new RecoveringReceiver(rsess12,
cons12,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
100,
67,
true),
// 4 transactional
new TransactionalReceiver(rsess13,
cons13,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
1,
1,
false),
new TransactionalReceiver(rsess14,
cons14,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
10,
7,
true),
new TransactionalReceiver(rsess15,
cons15,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
50,
21,
false),
new TransactionalReceiver(rsess16,
cons16,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
100,
67,
true),
// 4 2pc transactional
new Transactional2PCReceiver(rxaSess1,
cons17,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
1,
1,
false),
new Transactional2PCReceiver(rxaSess2,
cons18,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
10,
7,
true),
new Transactional2PCReceiver(rxaSess3,
cons19,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
50,
21,
false),
new Transactional2PCReceiver(rxaSess4,
cons20,
JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES + JMSStressTestBase.NUM_PERSISTENT_MESSAGES +
JMSStressTestBase.NUM_NON_PERSISTENT_PRESEND +
JMSStressTestBase.NUM_PERSISTENT_PRESEND,
100,
67,
true),
new Sender("prod3", sess1, prod1, JMSStressTestBase.NUM_NON_PERSISTENT_MESSAGES),
new Sender("prod4", sess2, prod2, JMSStressTestBase.NUM_PERSISTENT_MESSAGES)};
runRunners(runners);
conn1.close();
conn2.close();
}
}

View File

@ -1,231 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import org.apache.activemq.core.transaction.impl.XidImpl;
import org.apache.activemq.jms.tests.JmsTestLogger;
import org.apache.activemq.utils.UUIDGenerator;
/**
*
* A receiver that receives messages in a XA transaction
*
* Receives <commitSize> messages then prepares, commits, then
* Receives <rollbackSize> messages then prepares, rollsback until
* a total of <numMessages> messages have been received (committed)
* <numMessages> must be a multiple of <commitSize>
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public class Transactional2PCReceiver extends Receiver
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
protected int commitSize;
protected int rollbackSize;
protected XAResource xaResource;
class Count
{
int lastCommitted;
int lastReceived;
}
public Transactional2PCReceiver(final XASession sess,
final MessageConsumer cons,
final int numMessages,
final int commitSize,
final int rollbackSize,
final boolean isListener) throws Exception
{
super(sess, cons, numMessages, isListener);
this.commitSize = commitSize;
this.rollbackSize = rollbackSize;
xaResource = sess.getXAResource();
}
@Override
public void run()
{
// Small pause so as not to miss any messages in a topic
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
}
try
{
int iterations = numMessages / commitSize;
XidImpl xid = null;
xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
for (int outerCount = 0; outerCount < iterations; outerCount++)
{
for (int innerCount = 0; innerCount < commitSize; innerCount++)
{
Message m = getMessage();
if (m == null)
{
Transactional2PCReceiver.log.error("Message is null");
setFailed(true);
return;
}
String prodName = m.getStringProperty("PROD_NAME");
Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
Count count = (Count)counts.get(prodName);
if (count == null)
{
// First time
if (msgCount.intValue() != 0)
{
Transactional2PCReceiver.log.error("First message from " + prodName +
" is not 0, it is " +
msgCount);
setFailed(true);
return;
}
else
{
count = new Count();
counts.put(prodName, count);
}
}
else
{
if (count.lastCommitted != msgCount.intValue() - 1)
{
Transactional2PCReceiver.log.error("Message out of sequence for " + prodName +
", expected " +
(count.lastCommitted + 1) +
", actual " +
msgCount);
setFailed(true);
return;
}
}
count.lastCommitted = msgCount.intValue();
count.lastReceived = msgCount.intValue();
if (innerCount == commitSize - 1)
{
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.commit(xid, false);
// Starting new tx
xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
}
processingDone();
}
if (outerCount == iterations - 1)
{
break;
}
for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
{
Message m = getMessage();
if (m == null)
{
Transactional2PCReceiver.log.error("Message is null (rollback)");
setFailed(true);
return;
}
String prodName = m.getStringProperty("PROD_NAME");
Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
Count count = (Count)counts.get(prodName);
if (count == null)
{
// First time
if (msgCount.intValue() != 0)
{
Transactional2PCReceiver.log.error("First message from " + prodName +
" is not 0, it is " +
msgCount);
setFailed(true);
return;
}
else
{
count = new Count();
count.lastCommitted = -1;
counts.put(prodName, count);
}
}
else
{
if (count.lastReceived != msgCount.intValue() - 1)
{
Transactional2PCReceiver.log.error("Message out of sequence");
setFailed(true);
return;
}
}
count.lastReceived = msgCount.intValue();
if (innerCount == rollbackSize - 1)
{
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
}
processingDone();
}
}
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.commit(xid, false);
finished();
}
catch (Exception e)
{
Transactional2PCReceiver.log.error("Failed to receive message", e);
setFailed(true);
}
}
}

View File

@ -1,120 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import org.apache.activemq.core.transaction.impl.XidImpl;
import org.apache.activemq.jms.tests.JmsTestLogger;
import org.apache.activemq.utils.UUIDGenerator;
/**
*
* A Sender that sends messages to a destination in an XA transaction
*
* Sends messages to a destination in a jms transaction.
* Sends <commitSize> messages then prepares, commits, then
* sends <rollbackSize> messages then prepares, rollsback until
* a total of <numMessages> messages have been sent (commitSize)
* <nuMessages> must be a multiple of <commitSize>
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public class Transactional2PCSender extends Sender
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
protected int commitSize;
protected int rollbackSize;
protected XAResource xaResource;
public Transactional2PCSender(final String prodName,
final XASession sess,
final MessageProducer prod,
final int numMessages,
final int commitSize,
final int rollbackSize)
{
super(prodName, sess, prod, numMessages);
this.commitSize = commitSize;
this.rollbackSize = rollbackSize;
xaResource = sess.getXAResource();
}
@Override
public void run()
{
int iterations = numMessages / commitSize;
try
{
for (int outerCount = 0; outerCount < iterations; outerCount++)
{
XidImpl xid = null;
if (commitSize > 0)
{
xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
}
for (int innerCount = 0; innerCount < commitSize; innerCount++)
{
Message m = sess.createMessage();
m.setStringProperty("PROD_NAME", prodName);
m.setIntProperty("MSG_NUMBER", outerCount * commitSize + innerCount);
prod.send(m);
}
if (commitSize > 0)
{
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.commit(xid, false);
}
if (rollbackSize > 0)
{
xid = new XidImpl("bq1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
}
for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
{
Message m = sess.createMessage();
m.setStringProperty("PROD_NAME", prodName);
m.setIntProperty("MSG_NUMBER", (outerCount + 1) * commitSize + innerCount);
prod.send(m);
}
if (rollbackSize > 0)
{
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
}
}
}
catch (Exception e)
{
Transactional2PCSender.log.error("Failed to send message", e);
setFailed(true);
}
}
}

View File

@ -1,200 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import org.apache.activemq.jms.tests.JmsTestLogger;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
/**
*
* A Receiver that receives messages from a destination in a JMS transaction
*
* Receives <commitSize> messages then commits, then
* Receives <rollbackSize> messages then rollsback until
* a total of <numMessages> messages have been received (committed)
* <nuMessages> must be a multiple of <commitSize>
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public class TransactionalReceiver extends Receiver
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
protected int commitSize;
protected int rollbackSize;
class Count
{
int lastCommitted;
int lastReceived;
}
public TransactionalReceiver(final Session sess,
final MessageConsumer cons,
final int numMessages,
final int commitSize,
final int rollbackSize,
final boolean isListener) throws Exception
{
super(sess, cons, numMessages, isListener);
this.commitSize = commitSize;
this.rollbackSize = rollbackSize;
}
@Override
public void run()
{
// Small pause so as not to miss any messages in a topic
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
}
try
{
int iterations = numMessages / commitSize;
for (int outerCount = 0; outerCount < iterations; outerCount++)
{
for (int innerCount = 0; innerCount < commitSize; innerCount++)
{
Message m = getMessage();
if (m == null)
{
TransactionalReceiver.log.error("Message is null");
setFailed(true);
return;
}
String prodName = m.getStringProperty("PROD_NAME");
Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
Count count = (Count)counts.get(prodName);
if (count == null)
{
// First time
if (msgCount.intValue() != 0)
{
TransactionalReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
setFailed(true);
return;
}
else
{
count = new Count();
counts.put(prodName, count);
}
}
else
{
if (count.lastCommitted != msgCount.intValue() - 1)
{
TransactionalReceiver.log.error("Message out of sequence for " + m.getJMSMessageID() +
" " +
prodName +
", expected " +
(count.lastCommitted + 1) +
", actual " +
msgCount);
setFailed(true);
return;
}
}
count.lastCommitted = msgCount.intValue();
count.lastReceived = msgCount.intValue();
if (innerCount == commitSize - 1)
{
sess.commit();
}
processingDone();
}
if (outerCount == iterations - 1)
{
break;
}
for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
{
Message m = getMessage();
if (m == null)
{
TransactionalReceiver.log.error("Message is null");
setFailed(true);
return;
}
String prodName = m.getStringProperty("PROD_NAME");
Integer msgCount = new Integer(m.getIntProperty("MSG_NUMBER"));
Count count = (Count)counts.get(prodName);
if (count == null)
{
// First time
if (msgCount.intValue() != 0)
{
TransactionalReceiver.log.error("First message from " + prodName + " is not 0, it is " + msgCount);
setFailed(true);
return;
}
else
{
count = new Count();
count.lastCommitted = -1;
counts.put(prodName, count);
}
}
else
{
if (count.lastReceived != msgCount.intValue() - 1)
{
TransactionalReceiver.log.error("Message out of sequence");
setFailed(true);
return;
}
}
count.lastReceived = msgCount.intValue();
if (innerCount == rollbackSize - 1 && outerCount != iterations - 1)
{
// Don't roll back on the very last one
sess.rollback();
}
processingDone();
}
}
finished();
}
catch (Exception e)
{
TransactionalReceiver.log.error("Failed to receive message", e);
setFailed(true);
}
}
}

View File

@ -1,93 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.tests.stress;
import org.apache.activemq.jms.tests.JmsTestLogger;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
/**
*
* A Sender that sends messages to a destination in a JMS transaction.
*
* Sends messages to a destination in a jms transaction.
* Sends <commitSize> messages then commits, then
* sends <rollbackSize> messages then rollsback until
* a total of <numMessages> messages have been sent (commitSize)
* <numMessages> must be a multiple of <commitSize>
*
* @author <a href="tim.fox@jboss.com">Tim Fox</a>
*
*/
public class TransactionalSender extends Sender
{
private static final JmsTestLogger log = JmsTestLogger.LOGGER;
protected int commitSize;
protected int rollbackSize;
public TransactionalSender(final String prodName,
final Session sess,
final MessageProducer prod,
final int numMessages,
final int commitSize,
final int rollbackSize)
{
super(prodName, sess, prod, numMessages);
this.commitSize = commitSize;
this.rollbackSize = rollbackSize;
}
@Override
public void run()
{
int iterations = numMessages / commitSize;
try
{
for (int outerCount = 0; outerCount < iterations; outerCount++)
{
for (int innerCount = 0; innerCount < commitSize; innerCount++)
{
Message m = sess.createMessage();
m.setStringProperty("PROD_NAME", prodName);
m.setIntProperty("MSG_NUMBER", outerCount * commitSize + innerCount);
prod.send(m);
}
sess.commit();
for (int innerCount = 0; innerCount < rollbackSize; innerCount++)
{
Message m = sess.createMessage();
m.setStringProperty("PROD_NAME", prodName);
m.setIntProperty("MSG_NUMBER", (outerCount + 1) * commitSize + innerCount);
prod.send(m);
}
sess.rollback();
}
}
catch (Exception e)
{
TransactionalSender.log.error("Failed to send message", e);
setFailed(true);
}
}
}

View File

@ -77,15 +77,9 @@
<profiles>
<profile>
<id>banned-tests</id>
<id>extra-tests</id>
<modules>
<module>jms-tests</module>
<module>integration-tests</module>
<module>byteman-tests</module>
<module>soak-tests</module>
<module>stress-tests</module>
<module>concurrent-tests</module>
<module>performance-tests</module>
<module>extra-tests</module>
</modules>
</profile>
</profiles>
@ -94,5 +88,10 @@
<module>unit-tests</module>
<module>joram-tests</module>
<module>timing-tests</module>
<module>jms-tests</module>
<module>integration-tests</module>
<module>soak-tests</module>
<module>stress-tests</module>
<module>performance-tests</module>
</modules>
</project>

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.concurrent.server.impl;
package org.apache.activemq.tests.timing.core.server.impl;
import org.junit.Before;
import org.junit.After;
@ -43,7 +43,7 @@ import org.apache.activemq.tests.util.UnitTestCase;
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
public class QueueTest extends UnitTestCase
public class QueueConcurrentTest extends UnitTestCase
{
private static final UnitTestLogger log = UnitTestLogger.LOGGER;
@ -114,9 +114,9 @@ public class QueueTest extends UnitTestCase
assertRefListsIdenticalRefs(sender.getReferences(), consumer.getReferences());
QueueTest.log.info("num refs: " + sender.getReferences().size());
QueueConcurrentTest.log.info("num refs: " + sender.getReferences().size());
QueueTest.log.info("num toggles: " + toggler.getNumToggles());
QueueConcurrentTest.log.info("num toggles: " + toggler.getNumToggles());
}