NO-JIRA: fixing test

This commit is contained in:
Clebert Suconic 2017-08-23 16:04:42 -04:00
parent 74db627b83
commit 78321668ea
2 changed files with 34 additions and 6 deletions

View File

@ -1375,11 +1375,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params); serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
} }
if (ha) { setSessionFactoryCreateLocator(node, ha, serverTotc);
locators[node] = ActiveMQClient.createServerLocatorWithHA(serverTotc);
} else {
locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
}
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node])); locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
@ -1392,6 +1388,14 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
sfs[node] = sf; sfs[node] = sf;
} }
protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) {
if (ha) {
locators[node] = ActiveMQClient.createServerLocatorWithHA(serverTotc);
} else {
locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc);
}
}
protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception { protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception {
if (sfs[node] != null) { if (sfs[node] != null) {
throw new IllegalArgumentException("Already a server at " + node); throw new IllegalArgumentException("Already a server at " + node);

View File

@ -24,12 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
@ -37,6 +39,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -63,6 +66,17 @@ public class MessageRedistributionTest extends ClusterTestBase {
return false; return false;
} }
@Override
protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) {
super.setSessionFactoryCreateLocator(node, ha, serverTotc);
locators[node].setConsumerWindowSize(0);
}
//https://issues.jboss.org/browse/HORNETQ-1061 //https://issues.jboss.org/browse/HORNETQ-1061
@Test @Test
public void testRedistributionWithMessageGroups() throws Exception { public void testRedistributionWithMessageGroups() throws Exception {
@ -989,7 +1003,17 @@ public class MessageRedistributionTest extends ClusterTestBase {
removeConsumer(0); removeConsumer(0);
addConsumer(1, 1, "queue0", null); addConsumer(1, 1, "queue0", null);
verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1); Queue queue = servers[1].locateQueue(SimpleString.toSimpleString("queue0"));
Assert.assertNotNull(queue);
Wait.waitFor(() -> queue.getMessageCount() == QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2);
for (int i = 0; i < QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2; i++) {
ClientMessage message = consumers[1].getConsumer().receive(5000);
Assert.assertNotNull(message);
message.acknowledge();
}
Assert.assertNull(consumers[1].getConsumer().receiveImmediate());
} }
/* /*