ARTEMIS-250 fix scale-down with security

This commit is contained in:
jbertram 2015-11-20 14:44:45 -06:00
parent 527da42f16
commit c0a6d6ee44
5 changed files with 78 additions and 37 deletions

View File

@ -197,4 +197,12 @@ public class ClusterControl implements AutoCloseable {
ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
clusterChannel.send(announceMessage);
}
public String getClusterUser() {
return clusterUser;
}
public String getClusterPassword() {
return clusterPassword;
}
}

View File

@ -91,19 +91,19 @@ public class ScaleDownHandler {
SimpleString targetNodeId) throws Exception {
ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory);
clusterControl.authorize();
long num = scaleDownMessages(sessionFactory, targetNodeId);
long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
ActiveMQServerLogger.LOGGER.info("Scaled down " + num + " messages total.");
scaleDownTransactions(sessionFactory, resourceManager);
scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress);
scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
return num;
}
public long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId) throws Exception {
public long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId, String user, String password) throws Exception {
long messageCount = 0;
targetNodeId = nodeId != null ? nodeId.toString() : getTargetNodeId(sessionFactory);
try (ClientSession session = sessionFactory.createSession(false, true, true)) {
try (ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, 0)) {
ClientProducer producer = session.createProducer();
// perform a loop per address
@ -307,9 +307,11 @@ public class ScaleDownHandler {
}
public void scaleDownTransactions(ClientSessionFactory sessionFactory,
ResourceManager resourceManager) throws Exception {
ClientSession session = sessionFactory.createSession(true, false, false);
ClientSession queueCreateSession = sessionFactory.createSession(false, true, true);
ResourceManager resourceManager,
String user,
String password) throws Exception {
ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0);
ClientSession queueCreateSession = sessionFactory.createSession(user, password, false, true, true, false, 0);
List<Xid> preparedTransactions = resourceManager.getPreparedTransactions();
Map<String, Long> queueIDs = new HashMap<>();
for (Xid xid : preparedTransactions) {
@ -398,8 +400,10 @@ public class ScaleDownHandler {
public void scaleDownDuplicateIDs(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
ClientSessionFactory sessionFactory,
SimpleString managementAddress) throws Exception {
ClientSession session = sessionFactory.createSession(true, false, false);
SimpleString managementAddress,
String user,
String password) throws Exception {
ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0);
ClientProducer producer = session.createProducer(managementAddress);
//todo - https://issues.jboss.org/browse/HORNETQ-1336
for (SimpleString address : duplicateIDMap.keySet()) {

View File

@ -489,18 +489,30 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
}
protected void createQueue(final int node,
final String address,
final String queueName,
final String filterVal,
final boolean durable) throws Exception {
createQueue(node, address, queueName, filterVal, durable, null, null);
}
protected void createQueue(final int node,
final String address,
final String queueName,
final String filterVal,
final boolean durable,
final String user,
final String password) throws Exception {
ClientSessionFactory sf = sfs[node];
if (sf == null) {
throw new IllegalArgumentException("No sf at " + node);
}
ClientSession session = addClientSession(sf.createSession(false, true, true));
ClientSession session = addClientSession(sf.createSession(user, password, false, true, true, false, 0));
String filterString = null;
@ -541,6 +553,16 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
final String queueName,
final String filterVal,
boolean autoCommitAcks) throws Exception {
addConsumer(consumerID, node, queueName, filterVal, autoCommitAcks, null, null);
}
protected void addConsumer(final int consumerID,
final int node,
final String queueName,
final String filterVal,
boolean autoCommitAcks,
final String user,
final String password) throws Exception {
try {
if (consumers[consumerID] != null) {
throw new IllegalArgumentException("Already a consumer at " + node);
@ -552,7 +574,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
throw new IllegalArgumentException("No sf at " + node);
}
ClientSession session = addClientSession(sf.createSession(false, false, autoCommitAcks));
ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, false, 0));
String filterString = null;
@ -1307,6 +1329,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
}
protected void setupSessionFactory(final int node, final boolean netty, boolean ha) throws Exception {
setupSessionFactory(node, netty, ha, null, null);
}
protected void setupSessionFactory(final int node, final boolean netty, boolean ha, final String user, final String password) throws Exception {
if (sfs[node] != null) {
throw new IllegalArgumentException("Already a factory at " + node);
}
@ -1335,7 +1361,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
addServerLocator(locators[node]);
ClientSessionFactory sf = createSessionFactory(locators[node]);
ClientSession session = sf.createSession();
ClientSession session = sf.createSession(user, password, false, true, true, false, 0);
session.close();
sfs[node] = sf;
}

View File

@ -46,8 +46,11 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
public void setUp() throws Exception {
super.setUp();
setupLiveServer(0, isFileStorage(), false, isNetty(), true);
servers[0].getConfiguration().setSecurityEnabled(true);
setupLiveServer(1, isFileStorage(), false, isNetty(), true);
servers[1].getConfiguration().setSecurityEnabled(true);
setupLiveServer(2, isFileStorage(), false, isNetty(), true);
servers[2].getConfiguration().setSecurityEnabled(true);
LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
ScaleDownConfiguration scaleDownConfiguration0 = new ScaleDownConfiguration();
haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration0);
@ -65,9 +68,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
Assert.assertEquals(61617, servers[0].getConfiguration().getConnectorConfigurations().get(scaleDownConnector).getParams().get(TransportConstants.PORT_PROP_NAME));
scaleDownConfiguration0.getConnectors().add(scaleDownConnector);
startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(0, isNetty(), false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
setupSessionFactory(1, isNetty(), false, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
setupSessionFactory(2, isNetty(), false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
IntegrationTestLogger.LOGGER.info("===============================");
IntegrationTestLogger.LOGGER.info("Node 0: " + servers[0].getClusterManager().getNodeId());
IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
@ -109,16 +112,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
final String queueName1 = "testQueue1";
// create a queue on each node mapped to the same address
createQueue(0, addressName, queueName1, null, false);
createQueue(1, addressName, queueName1, null, false);
createQueue(2, addressName, queueName1, null, false);
createQueue(0, addressName, queueName1, null, false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
createQueue(1, addressName, queueName1, null, false, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
// pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
String snfAddress = "sf.cluster0." + servers[0].getNodeID().toString();
Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
snfQueue.pause();
ClientSession session = sfs[2].createSession(false, true, false);
ClientSession session = sfs[2].createSession(servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword(), false, true, false, false, 0);
Message message;
@ -152,7 +155,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
}
// add a consumer to node 0 to trigger redistribution here
addConsumer(0, 0, queueName1, null);
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to the SnF queue
long timeout = 10000;
@ -194,7 +197,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
Assert.assertEquals(0, messageCount);
// get the messages from queue 1 on node 1
addConsumer(0, 1, queueName1, null);
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to node 1
start = System.currentTimeMillis();
@ -246,26 +249,26 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
final String queueName3 = "testQueue3";
// create a queue on each node mapped to the same address
createQueue(0, addressName, queueName1, null, false);
createQueue(1, addressName, queueName1, null, false);
createQueue(2, addressName, queueName1, null, false);
createQueue(0, addressName, queueName1, null, false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
createQueue(1, addressName, queueName1, null, false, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
// create a queue on each node mapped to the same address
createQueue(0, addressName, queueName2, null, false);
createQueue(1, addressName, queueName2, null, false);
createQueue(2, addressName, queueName2, null, false);
createQueue(0, addressName, queueName2, null, false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
createQueue(1, addressName, queueName2, null, false, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
createQueue(2, addressName, queueName2, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
// create a queue on each node mapped to the same address
createQueue(0, addressName, queueName3, null, false);
createQueue(1, addressName, queueName3, null, false);
createQueue(2, addressName, queueName3, null, false);
createQueue(0, addressName, queueName3, null, false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
createQueue(1, addressName, queueName3, null, false, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
createQueue(2, addressName, queueName3, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
// pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
String snfAddress = "sf.cluster0." + servers[0].getNodeID().toString();
Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
snfQueue.pause();
ClientSession session = sfs[2].createSession(false, true, false);
ClientSession session = sfs[2].createSession(servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword(), false, true, false, false, 0);
Message message;
message = session.createMessage(false);
@ -276,8 +279,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
}
// add a consumer to node 0 to trigger redistribution here
addConsumer(0, 0, queueName1, null);
addConsumer(1, 0, queueName3, null);
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to the SnF queue
long timeout = 10000;
@ -323,8 +326,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
// get the messages from queue 1 on node 1
addConsumer(0, 1, queueName1, null);
addConsumer(1, 1, queueName3, null);
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to node 1
start = System.currentTimeMillis();

View File

@ -267,7 +267,7 @@ public class ScaleDownDirectTest extends ClusterTestBase {
private long performScaledown() throws Exception {
ScaleDownHandler handler = new ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), servers[0].getNodeManager(), servers[0].getClusterManager().getClusterController(), servers[0].getStorageManager());
return handler.scaleDownMessages(sfs[1], servers[1].getNodeID());
return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(), servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
}
}