From 368cb6653ea342232792cf231e9e20c447b84175 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 13 Aug 2008 18:15:12 +0000 Subject: [PATCH] group moving from trunk to the sandbox git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@685630 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/group/GroupMapTest.java | 548 ------------------ .../activemq/group/GroupMemberTest.java | 168 ------ .../activemq/group/GroupMessageTest.java | 256 -------- 3 files changed, 972 deletions(-) delete mode 100644 activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java delete mode 100644 activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java delete mode 100644 activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java diff --git a/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java b/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java deleted file mode 100644 index 74be0ae846..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java +++ /dev/null @@ -1,548 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import junit.framework.TestCase; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; - - -public class GroupMapTest extends TestCase { - protected BrokerService broker; - protected Connection connection1; - protected Connection connection2; - protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; - - /** - * Test method for - * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. - * @throws Exception - */ - public void testAddMemberChangedListener() throws Exception { - final AtomicInteger counter = new AtomicInteger(); - Group map1 = new Group(connection1,"map1"); - map1.addMemberChangedListener(new MemberChangedListener(){ - - public void memberStarted(Member member) { - synchronized(counter) { - counter.incrementAndGet(); - counter.notifyAll(); - } - - } - - public void memberStopped(Member member) { - synchronized(counter) { - counter.decrementAndGet(); - counter.notifyAll(); - } - } - - }); - map1.start(); - synchronized(counter) { - if (counter.get()<1) { - counter.wait(5000); - } - } - assertEquals(1, counter.get()); - Group map2 = new Group(connection2,"map2"); - map2.start(); - synchronized(counter) { - if (counter.get()<2) { - counter.wait(5000); - } - } - assertEquals(2, counter.get()); - map2.stop(); - synchronized(counter) { - if (counter.get()>=2) { - counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3); - } - } - assertEquals(1, counter.get()); - map1.stop(); - } - - /** - * Test method for - * {@link org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}. - * @throws Exception - */ - public void testAddMapChangedListener() throws Exception { - final AtomicBoolean called1 = new AtomicBoolean(); - final AtomicBoolean called2 = new AtomicBoolean(); - - Group map1 = new Group(connection1,"map1"); - - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called1) { - called1.set(true); - called1.notifyAll(); - } - } - }); - map1.start(); - - Group map2 = new Group(connection2,"map2"); - - map2.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called2) { - called2.set(true); - called2.notifyAll(); - } - } - }); - map2.start(); - - - map1.put("test", "blob"); - synchronized(called1) { - if (!called1.get()) { - called1.wait(5000); - } - } - synchronized(called2) { - if (!called2.get()) { - called2.wait(5000); - } - } - assertTrue(called1.get()); - assertTrue(called2.get()); - map1.stop(); - map2.stop(); - } - - public void testGetImplicitWriteLock() throws Exception { - Group map1 = new Group(connection1, "map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.start(); - Group map2 = new Group(connection2, "map2"); - map2.setAlwaysLock(true); - map2.setMinimumGroupSize(2); - map2.start(); - map2.put("test", "foo"); - try { - map1.put("test", "bah"); - fail("Should have thrown an exception!"); - } catch (GroupMapUpdateException e) { - } - map1.stop(); - map2.stop(); - } - - public void testExpireImplicitWriteLock() throws Exception { - Group map1 = new Group(connection1, "map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.start(); - Group map2 = new Group(connection2, "map2"); - map2.setAlwaysLock(true); - map2.setLockTimeToLive(1000); - map2.setMinimumGroupSize(2); - map2.start(); - map2.put("test", "foo"); - try { - map1.put("test", "bah"); - fail("Should have thrown an exception!"); - } catch (GroupMapUpdateException e) { - } - Thread.sleep(2000); - map1.put("test", "bah"); - map1.stop(); - map2.stop(); - } - - public void testExpireImplicitLockOnExit() throws Exception { - Group map1 = new Group(connection1, "map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.start(); - Group map2 = new Group(connection2, "map2"); - map2.setAlwaysLock(true); - map2.setMinimumGroupSize(2); - map2.start(); - map2.put("test", "foo"); - try { - map1.put("test", "bah"); - fail("Should have thrown an exception!"); - } catch (GroupMapUpdateException e) { - } - map2.stop(); - map1.put("test", "bah"); - map1.stop(); - - } - - public void testGetExplicitWriteLock() throws Exception { - Group map1 = new Group(connection1, "map1"); - map1.setAlwaysLock(true); - final AtomicBoolean called = new AtomicBoolean(); - map1.start(); - Group map2 = new Group(connection2, "map2"); - map2.setAlwaysLock(true); - map2.setMinimumGroupSize(2); - map2.start(); - map2.put("test", "foo"); - map2.lock("test"); - try { - map1.put("test", "bah"); - fail("Should have thrown an exception!"); - } catch (GroupMapUpdateException e) { - } - map2.unlock("test"); - map1.lock("test"); - try { - map2.lock("test"); - fail("Should have thrown an exception!"); - } catch (GroupMapUpdateException e) { - } - map1.stop(); - map2.stop(); - } - - - - /** - * Test method for {@link org.apache.activemq.group.Group#clear()}. - * - * @throws Exception - */ - public void testClear() throws Exception { - Group map1 = new Group(connection1,"map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - - public void mapRemove(Member owner, Object key, Object value,boolean expired) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - }); - map1.start(); - Group map2 = new Group(connection2,"map2"); - map2.start(); - map2.put("test","foo"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - called.set(false); - assertTrue(map1.isEmpty()==false); - map2.clear(); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(map1.isEmpty()); - map1.stop(); - map2.stop(); - } - - /** - * Test a new map is populated for existing values - */ - public void testMapUpdatedOnStart() throws Exception { - Group map1 = new Group(connection1,"map1"); - final AtomicBoolean called = new AtomicBoolean(); - - map1.start(); - map1.put("test", "foo"); - Group map2 = new Group(connection2,"map2"); - map2.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - }); - map2.start(); - - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - called.set(false); - assertTrue(map2.containsKey("test")); - assertTrue(map2.containsValue("foo")); - map1.stop(); - map2.stop(); - } - - public void testContainsKey() throws Exception { - Group map1 = new Group(connection1,"map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - }); - map1.start(); - Group map2 = new Group(connection2,"map2"); - map2.start(); - map2.put("test","foo"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - called.set(false); - assertTrue(map1.containsKey("test")); - map1.stop(); - map2.stop(); - } - - - /** - * Test method for - * {@link org.apache.activemq.group.Group#containsValue(java.lang.Object)}. - * @throws Exception - */ - public void testContainsValue() throws Exception { - Group map1 = new Group(connection1,"map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - }); - map1.start(); - Group map2 = new Group(connection2,"map2"); - map2.start(); - map2.put("test","foo"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - called.set(false); - assertTrue(map1.containsValue("foo")); - map1.stop(); - map2.stop(); - } - - /** - * Test method for {@link org.apache.activemq.group.GroupMap#entrySet()}. - * @throws Exception - */ - - - /** - * Test method for - * {@link org.apache.activemq.group.Group#get(java.lang.Object)}. - * @throws Exception - */ - public void testGet() throws Exception { - Group map1 = new Group(connection1,"map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - }); - map1.start(); - Group map2 = new Group(connection2,"map2"); - map2.start(); - map2.put("test","foo"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - assertTrue(map1.get("test").equals("foo")); - map1.stop(); - map2.stop(); - } - - public void testPut() throws Exception { - Group map1 = new Group(connection1,"map1"); - map1.start(); - Group map2 = new Group(connection2,"map2"); - map2.setMinimumGroupSize(2); - map2.start(); - Object value = map1.put("foo", "blob"); - assertNull(value); - value = map1.put("foo", "blah"); - assertEquals(value, "blob"); - map1.stop(); - map2.stop(); - } - - - - /** - * Test method for - * {@link org.apache.activemq.group.Group#remove(java.lang.Object)}. - */ - public void testRemove() throws Exception{ - Group map1 = new Group(connection1,"map1"); - final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapInsert(Member owner,Object Key, Object Value) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - - public void mapRemove(Member owner, Object key, Object value,boolean expired) { - synchronized(called) { - called.set(true); - called.notifyAll(); - } - } - }); - map1.start(); - Group map2 = new Group(connection2,"map2"); - map2.start(); - map2.put("test","foo"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - called.set(false); - assertTrue(map1.isEmpty()==false); - map2.remove("test"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(map1.isEmpty()); - - map1.stop(); - map2.stop(); - } - - public void testExpire() throws Exception{ - final AtomicBoolean called1 = new AtomicBoolean(); - final AtomicBoolean called2 = new AtomicBoolean(); - - Group map1 = new Group(connection1,"map1"); - map1.setTimeToLive(1000); - map1.addMapChangedListener(new DefaultMapChangedListener() { - public void mapRemove(Member owner, Object key, Object value,boolean expired) { - synchronized(called1) { - called1.set(expired); - called1.notifyAll(); - } - } - }); - map1.start(); - - Group map2 = new Group(connection2,"map2"); - - map2.addMapChangedListener(new DefaultMapChangedListener() { - public void mapRemove(Member owner, Object key, Object value,boolean expired) { - synchronized(called2) { - called2.set(expired); - called2.notifyAll(); - } - } - }); - map2.start(); - - - map1.put("test", "blob"); - synchronized(called1) { - if (!called1.get()) { - called1.wait(5000); - } - } - synchronized(called2) { - if (!called2.get()) { - called2.wait(5000); - } - } - assertTrue(called1.get()); - assertTrue(called2.get()); - map1.stop(); - map2.stop(); - } - - protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); - } - ConnectionFactory factory = createConnectionFactory(); - connection1 = factory.createConnection(); - connection1.start(); - connection2 = factory.createConnection(); - connection2.start(); - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - connection1.close(); - connection2.close(); - if (broker != null) { - broker.stop(); - } - } - - protected ActiveMQConnectionFactory createConnectionFactory()throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( - ActiveMQConnection.DEFAULT_BROKER_URL); - return cf; - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer); - answer.start(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception { - answer.setPersistent(false); - answer.addConnector(bindAddress); - answer.setDeleteAllMessagesOnStartup(true); - } -} diff --git a/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java b/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java deleted file mode 100644 index ce42e9500c..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.util.ArrayList; -import java.util.List; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import junit.framework.TestCase; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; - - -public class GroupMemberTest extends TestCase { - protected BrokerService broker; - protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; - - - public void testCoordinatorSelection() throws Exception{ - Group group = new Group(null,""); - Listlist = new ArrayList(); - final int number =10; - Member choosen = null; - for (int i =0;i< number;i++) { - Member m = new Member("group"+i); - m.setId(""+i); - if (number/2==i) { - m.setCoordinatorWeight(10); - choosen=m; - } - list.add(m); - } - Member c = group.selectCordinator(list); - assertEquals(c,choosen); - } - /** - * Test method for - * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. - * @throws Exception - */ - public void testGroup() throws Exception { - - final int number = 10; - Listconnections = new ArrayList(); - ListgroupMaps = new ArrayList(); - ConnectionFactory factory = createConnectionFactory(); - for (int i =0; i < number; i++) { - Connection connection = factory.createConnection(); - connection.start(); - connections.add(connection); - Group map = new Group(connection,"map"+i); - map.setHeartBeatInterval(200); - if(i ==number-1) { - map.setMinimumGroupSize(number); - } - map.start(); - groupMaps.add(map); - } - - int coordinatorNumber = 0; - for (Group map:groupMaps) { - if (map.isCoordinator()) { - coordinatorNumber++; - } - } - for(Group map:groupMaps) { - map.stop(); - } - for (Connection connection:connections) { - connection.stop(); - } - - } - -public void XtestWeightedGroup() throws Exception { - - final int number = 10; - Listconnections = new ArrayList(); - ListgroupMaps = new ArrayList(); - Group last = null; - ConnectionFactory factory = createConnectionFactory(); - for (int i =0; i < number; i++) { - Connection connection = factory.createConnection(); - connection.start(); - connections.add(connection); - Group map = new Group(connection,"map"+i); - map.setHeartBeatInterval(200); - if(i ==number-1) { - map.setMinimumGroupSize(number); - map.setCoordinatorWeight(10); - last=map; - } - map.start(); - groupMaps.add(map); - } - - int coordinator = 0; - Group groupCoordinator = null; - for (Group map:groupMaps) { - if (map.isCoordinator()) { - coordinator++; - groupCoordinator=map; - } - } - - assertNotNull(groupCoordinator); - assertEquals(groupCoordinator, last); - assertEquals(1,coordinator); - for(Group map:groupMaps) { - map.stop(); - } - for (Connection connection:connections) { - connection.stop(); - } - - } - - - - protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); - } - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - } - } - - protected ActiveMQConnectionFactory createConnectionFactory()throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( - ActiveMQConnection.DEFAULT_BROKER_URL); - return cf; - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer); - answer.start(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception { - answer.setPersistent(false); - answer.addConnector(bindAddress); - answer.setDeleteAllMessagesOnStartup(true); - } -} - diff --git a/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java deleted file mode 100644 index fa74b0d03c..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.group; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import junit.framework.TestCase; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; - -public class GroupMessageTest extends TestCase { - protected BrokerService broker; - protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; - - public void testGroupBroadcast() throws Exception { - final int number = 10; - final AtomicInteger count = new AtomicInteger(); - List connections = new ArrayList(); - List groups = new ArrayList(); - ConnectionFactory factory = createConnectionFactory(); - for (int i = 0; i < number; i++) { - Connection connection = factory.createConnection(); - connection.start(); - connections.add(connection); - Group group = new Group(connection, "group" + i); - group.setHeartBeatInterval(20000); - if (i == number - 1) { - group.setMinimumGroupSize(number); - } - group.start(); - groups.add(group); - group.addGroupMessageListener(new GroupMessageListener() { - public void messageDelivered(Member sender, String replyId, - Object message) { - synchronized (count) { - if (count.incrementAndGet() == number) { - count.notifyAll(); - } - } - } - }); - } - groups.get(0).broadcastMessage("hello"); - synchronized (count) { - if (count.get() < number) { - count.wait(5000); - } - } - assertEquals(number, count.get()); - for (Group map : groups) { - map.stop(); - } - for (Connection connection : connections) { - connection.stop(); - } - } - - public void testsendMessage() throws Exception { - final int number = 10; - final AtomicInteger count = new AtomicInteger(); - List connections = new ArrayList(); - List groups = new ArrayList(); - ConnectionFactory factory = createConnectionFactory(); - for (int i = 0; i < number; i++) { - Connection connection = factory.createConnection(); - connection.start(); - connections.add(connection); - Group group = new Group(connection, "group" + i); - group.setHeartBeatInterval(20000); - if (i == number - 1) { - group.setMinimumGroupSize(number); - } - group.start(); - groups.add(group); - group.addGroupMessageListener(new GroupMessageListener() { - public void messageDelivered(Member sender, String replyId, - Object message) { - synchronized (count) { - count.incrementAndGet(); - count.notifyAll(); - } - } - }); - } - groups.get(0).sendMessage("hello"); - synchronized (count) { - if (count.get() == 0) { - count.wait(5000); - } - } - // wait a while to check that only one got it - Thread.sleep(2000); - assertEquals(1, count.get()); - for (Group map : groups) { - map.stop(); - } - for (Connection connection : connections) { - connection.stop(); - } - } - - public void testSendToSingleMember() throws Exception { - ConnectionFactory factory = createConnectionFactory(); - Connection connection1 = factory.createConnection(); - Connection connection2 = factory.createConnection(); - connection1.start(); - connection2.start(); - Group group1 = new Group(connection1, "group1"); - final AtomicBoolean called = new AtomicBoolean(); - group1.addGroupMessageListener(new GroupMessageListener() { - public void messageDelivered(Member sender, String replyId, - Object message) { - synchronized (called) { - called.set(true); - called.notifyAll(); - } - } - }); - group1.start(); - Group group2 = new Group(connection2, "group2"); - group2.setMinimumGroupSize(2); - group2.start(); - Member member1 = group2.getMemberByName("group1"); - group2.sendMessage(member1, "hello"); - synchronized (called) { - if (!called.get()) { - called.wait(5000); - } - } - assertTrue(called.get()); - group1.stop(); - group2.stop(); - connection1.close(); - connection2.close(); - } - - public void testSendRequestReply() throws Exception { - ConnectionFactory factory = createConnectionFactory(); - Connection connection1 = factory.createConnection(); - Connection connection2 = factory.createConnection(); - connection1.start(); - connection2.start(); - final int number = 1000; - final AtomicInteger requestCount = new AtomicInteger(); - final AtomicInteger replyCount = new AtomicInteger(); - final List requests = new ArrayList(); - final List replies = new ArrayList(); - for (int i = 0; i < number; i++) { - requests.add("request" + i); - replies.add("reply" + i); - } - final Group group1 = new Group(connection1, "group1"); - final AtomicBoolean finished = new AtomicBoolean(); - group1.addGroupMessageListener(new GroupMessageListener() { - public void messageDelivered(Member sender, String replyId, - Object message) { - if (!replies.isEmpty()) { - String reply = replies.remove(0); - try { - group1.sendMessageResponse(sender, replyId, reply); - } catch (JMSException e) { - e.printStackTrace(); - } - } - } - }); - group1.start(); - final Group group2 = new Group(connection2, "group2"); - group2.setMinimumGroupSize(2); - group2.addGroupMessageListener(new GroupMessageListener() { - public void messageDelivered(Member sender, String replyId, - Object message) { - if (!requests.isEmpty()) { - String request = requests.remove(0); - try { - group2.sendMessage(sender, request); - } catch (JMSException e) { - e.printStackTrace(); - } - }else { - synchronized (finished) { - finished.set(true); - finished.notifyAll(); - } - } - } - }); - group2.start(); - Member member1 = group2.getMemberByName("group1"); - group2.sendMessage(member1, requests.remove(0)); - synchronized (finished) { - if (!finished.get()) { - finished.wait(10000); - } - } - assertTrue(finished.get()); - group1.stop(); - group2.stop(); - connection1.close(); - connection2.close(); - } - - protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); - } - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - } - } - - protected ActiveMQConnectionFactory createConnectionFactory() - throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( - ActiveMQConnection.DEFAULT_BROKER_URL); - return cf; - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer); - answer.start(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception { - answer.setPersistent(false); - answer.addConnector(bindAddress); - answer.setDeleteAllMessagesOnStartup(true); - } -}