ARTEMIS-2462 re-applying tests on SNF Delete Queue
This commit is contained in:
parent
d55ec37195
commit
dd20f89bd0
|
@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
// New node - create a new flow record
|
||||
|
||||
final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
|
||||
final SimpleString queueName = getSfQueueName(nodeID);
|
||||
|
||||
Binding queueBinding = postOffice.getBinding(queueName);
|
||||
|
||||
|
@ -741,6 +741,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
}
|
||||
}
|
||||
|
||||
public SimpleString getSfQueueName(String nodeID) {
|
||||
return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void informClusterOfBackup() {
|
||||
String nodeID = server.getNodeID().toString();
|
||||
|
|
|
@ -28,7 +28,9 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -272,6 +274,35 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
testParsingOverFlow("<bridges> \n" + " <bridge name=\"price-forward-bridge\"> \n" + " <queue-name>priceForwarding</queue-name> \n" + " <forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + " <producer-window-size>2147483648</producer-window-size>\n" + " <static-connectors> \n" + " <connector-ref>netty</connector-ref> \n" + " </static-connectors> \n" + " </bridge> \n" + "</bridges>\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsingScaleDownConfig() throws Exception {
|
||||
FileConfigurationParser parser = new FileConfigurationParser();
|
||||
|
||||
String configStr = firstPart + "<ha-policy>\n" +
|
||||
" <live-only>\n" +
|
||||
" <scale-down>\n" +
|
||||
" <connectors>\n" +
|
||||
" <connector-ref>server0-connector</connector-ref>\n" +
|
||||
" </connectors>\n" +
|
||||
" </scale-down>\n" +
|
||||
" </live-only>\n" +
|
||||
"</ha-policy>\n" + lastPart;
|
||||
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
Configuration config = parser.parseMainConfig(input);
|
||||
|
||||
HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
|
||||
assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
|
||||
|
||||
LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
|
||||
ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
|
||||
List<String> connectors = scaledownCfg.getConnectors();
|
||||
assertEquals(1, connectors.size());
|
||||
String connector = connectors.get(0);
|
||||
assertEquals("server0-connector", connector);
|
||||
}
|
||||
|
||||
|
||||
private void testParsingOverFlow(String config) throws Exception {
|
||||
FileConfigurationParser parser = new FileConfigurationParser();
|
||||
String firstPartWithoutAddressSettings = firstPart.substring(0, firstPart.indexOf("<address-settings"));
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.server;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
||||
|
||||
public ScaleDownRemoveSFTest() {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
|
||||
setupLiveServer(0, isFileStorage(), isNetty(), true);
|
||||
setupLiveServer(1, isFileStorage(), isNetty(), true);
|
||||
LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
|
||||
haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
|
||||
LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
|
||||
haPolicyConfiguration1.setScaleDownConfiguration(new ScaleDownConfiguration());
|
||||
|
||||
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
||||
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
||||
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
||||
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
||||
servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
|
||||
servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
|
||||
startServers(0, 1);
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
protected boolean isNetty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScaleDownCheckSF() throws Exception {
|
||||
final int TEST_SIZE = 2;
|
||||
final String addressName = "testAddress";
|
||||
final String queueName1 = "testQueue1";
|
||||
|
||||
// create 2 queues on each node mapped to the same address
|
||||
createQueue(0, addressName, queueName1, null, true);
|
||||
createQueue(1, addressName, queueName1, null, true);
|
||||
|
||||
// send messages to node 0
|
||||
send(0, addressName, TEST_SIZE, true, null);
|
||||
|
||||
// consume a message from queue 1
|
||||
addConsumer(1, 0, queueName1, null, false);
|
||||
ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
|
||||
Assert.assertNotNull(clientMessage);
|
||||
clientMessage.acknowledge();
|
||||
consumers[1].getSession().commit();
|
||||
|
||||
Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
|
||||
|
||||
//check sf queue on server1 exists
|
||||
ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster0");
|
||||
SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
|
||||
|
||||
System.out.println("[sf queue on server 1]: " + sfQueueName);
|
||||
QueueQueryResult result = servers[1].queueQuery(sfQueueName);
|
||||
assertTrue(result.isExists());
|
||||
|
||||
// trigger scaleDown from node 0 to node 1
|
||||
servers[0].stop();
|
||||
|
||||
addConsumer(0, 1, queueName1, null);
|
||||
clientMessage = consumers[0].getConsumer().receive(250);
|
||||
Assert.assertNotNull(clientMessage);
|
||||
clientMessage.acknowledge();
|
||||
|
||||
// ensure there are no more messages on queue 1
|
||||
clientMessage = consumers[0].getConsumer().receive(250);
|
||||
Assert.assertNull(clientMessage);
|
||||
removeConsumer(0);
|
||||
|
||||
//check
|
||||
result = servers[1].queueQuery(sfQueueName);
|
||||
AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
|
||||
assertFalse(result.isExists());
|
||||
assertFalse(result2.isExists());
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue