Implementing AMQ-4788 - Adding tests for the ZooKeeper variant of the partition broker plugin.

This commit is contained in:
Hiram Chirino 2013-10-08 11:09:34 -04:00
parent ef64b057a6
commit 25f70ad483
5 changed files with 182 additions and 19 deletions

View File

@ -130,6 +130,7 @@ public class PartitionBroker extends BrokerFilter {
String connectionString = getConnectionString(targetDTO.ids);
if( connectionString==null ) {
LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
return;
}
LOG.info("Redirecting connection to: " + connectionString);
@ -141,11 +142,9 @@ public class PartitionBroker extends BrokerFilter {
}
protected String getConnectionString(HashSet<String> ids) {
if( getConfig().brokers==null || getConfig().brokers.isEmpty() )
return null;
StringBuilder rc = new StringBuilder();
for (String id : ids) {
String url = getConfig().brokers.get(id);
String url = plugin.getBrokerURL(this, id);
if( url!=null ) {
if( rc.length()!=0 ) {
rc.append(',');
@ -153,6 +152,8 @@ public class PartitionBroker extends BrokerFilter {
rc.append(url);
}
}
if( rc.length()==0 )
return null;
return rc.toString();
}
@ -270,16 +271,22 @@ public class PartitionBroker extends BrokerFilter {
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
if( info.isFaultTolerant() ) {
ConnectionMonitor monitor = new ConnectionMonitor(context);
monitors.put(info.getConnectionId(), monitor);
super.addConnection(context, info);
checkTarget(monitor);
} else {
super.addConnection(context, info);
}
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
ConnectionMonitor removed = monitors.remove(info.getConnectionId());
if( info.isFaultTolerant() ) {
monitors.remove(info.getConnectionId());
}
}
@Override

View File

@ -58,4 +58,10 @@ public class PartitionBrokerPlugin implements BrokerPlugin {
this.config = Partitioning.MAPPER.readValue(config, Partitioning.class);
}
public String getBrokerURL(PartitionBroker partitionBroker, String id) {
if( config!=null && config.brokers!=null ) {
return config.brokers.get(id);
}
return null;
}
}

View File

@ -26,6 +26,9 @@ import org.linkedin.util.clock.Timespan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*/
public class ZooKeeperPartitionBroker extends PartitionBroker {
@ -34,11 +37,20 @@ public class ZooKeeperPartitionBroker extends PartitionBroker {
protected volatile ZKClient zk_client = null;
protected volatile Partitioning config;
protected final CountDownLatch configAcquired = new CountDownLatch(1);
public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
super(broker, plugin);
}
@Override
public void start() throws Exception {
super.start();
// Lets block a bit until we get our config.. Otherwise just keep
// on going.. not a big deal if we get our config later. Perhaps
// ZK service is not having a good day.
configAcquired.await(5, TimeUnit.SECONDS);
}
@Override
protected void onMonitorStop() {
@ -96,6 +108,7 @@ public class ZooKeeperPartitionBroker extends PartitionBroker {
monitorWakeup();
}
}, stat);
configAcquired.countDown();
reloadConfigOnPoll = false;
} catch (Exception e) {
LOG.warn("Could load partitioning configuration: " + e, e);

View File

@ -23,6 +23,9 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import java.io.IOException;
@ -31,23 +34,51 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
/**
* Unit tests for the PartitionBroker plugin.
*/
public class PartitionBrokerTest extends AutoFailTestSupport {
public class PartitionBrokerTest {
protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
protected ArrayList<Connection> connections = new ArrayList<Connection>();
Partitioning partitioning;
@Override
protected void setUp() throws Exception {
super.setUp();
@Before
public void setUp() throws Exception {
partitioning = new Partitioning();
partitioning.brokers = new HashMap<String, String>();
}
/**
* Partitioning can only re-direct failover clients since those
* can re-connect and re-establish their state with another broker.
*/
@Test(timeout = 1000*60*60)
public void testNonFailoverClientHasNoPartitionEffect() throws Exception {
partitioning.byClientId = new HashMap<String, Target>();
partitioning.byClientId.put("client1", new Target("broker1"));
createBrokerCluster(2);
Connection connection = createConnectionToUrl(getConnectURL("broker2"));
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
connection.setClientID("client1");
connection.start();
Thread.sleep(1000);
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
@Test(timeout = 1000*60*60)
public void testPartitionByClientId() throws Exception {
partitioning.byClientId = new HashMap<String, Target>();
partitioning.byClientId.put("client1", new Target("broker1"));
@ -73,6 +104,7 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
});
}
@Test(timeout = 1000*60*60)
public void testPartitionByQueue() throws Exception {
partitioning.byQueue = new HashMap<String, Target>();
partitioning.byQueue.put("foo", new Target("broker1"));
@ -149,7 +181,10 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
}
protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
String url = "failover://(" + getConnectURL(brokerId) + ")";
return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")");
}
private Connection createConnectionToUrl(String url) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connections.add(connection);
@ -174,16 +209,20 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
String brokerId = "broker" + i;
BrokerService broker = createBroker(brokerId);
broker.setPersistent(false);
PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
plugin.setConfig(partitioning);
broker.setPlugins(new BrokerPlugin[]{plugin});
broker.addConnector("tcp://localhost:0").setName("tcp");
addPartitionBrokerPlugin(broker);
broker.start();
broker.waitUntilStarted();
partitioning.brokers.put(brokerId, getConnectURL(brokerId));
}
}
protected void addPartitionBrokerPlugin(BrokerService broker) {
PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
plugin.setConfig(partitioning);
broker.setPlugins(new BrokerPlugin[]{plugin});
}
protected BrokerService createBroker(String name) {
BrokerService broker = new BrokerService();
broker.setBrokerName(name);
@ -191,8 +230,8 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
return broker;
}
@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
for (Connection connection : connections) {
try {
connection.close();
@ -208,7 +247,6 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
}
}
brokers.clear();
super.tearDown();
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.leveldb.replicated.groups.ZKClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.After;
import org.junit.Before;
import org.linkedin.util.clock.Timespan;
import java.io.File;
import java.net.InetSocketAddress;
/**
*/
public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest {
NIOServerCnxnFactory connector;
@Before
public void setUp() throws Exception {
System.out.println("Starting ZooKeeper");
ZooKeeperServer zk_server = new ZooKeeperServer();
zk_server.setTickTime(500);
zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data")));
connector = new NIOServerCnxnFactory();
connector.configure(new InetSocketAddress(0), 100);
connector.startup(zk_server);
System.out.println("ZooKeeper Started");
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
if( connector!=null ) {
connector.shutdown();
connector = null;
}
}
String zkPath = "/partition-config";
@Override
protected void createBrokerCluster(int brokerCount) throws Exception {
// Store the partitioning in ZK.
ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null);
try {
zk_client.start();
zk_client.waitForConnected(Timespan.parse("30s"));
try {
zk_client.delete(zkPath);
} catch (Throwable e) {
}
zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT);
} finally {
zk_client.close();
}
super.createBrokerCluster(brokerCount);
}
@Override
protected void addPartitionBrokerPlugin(BrokerService broker) {
// Have the borker plugin get the partition config via ZK.
ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){
@Override
public String getBrokerURL(PartitionBroker partitionBroker, String id) {
try {
return getConnectURL(id);
} catch (Exception e) {
return null;
}
}
};
plugin.setZkAddress("localhost:" + connector.getLocalPort());
plugin.setZkPath(zkPath);
broker.setPlugins(new BrokerPlugin[]{plugin});
}
}