Fixes https://issues.apache.org/jira/browse/AMQ-4923: Replicated LevelDB: Loss of broker Quorum fails to fully stop the master

This commit is contained in:
Hiram Chirino 2013-12-05 13:27:59 -05:00
parent 044c2d98ff
commit ed8e4eae8f
3 changed files with 199 additions and 7 deletions

View File

@ -878,6 +878,10 @@ public class BrokerService implements Service {
}
}
public boolean isStopped() {
return stopped.get();
}
/**
* A helper method to block the caller thread until the broker has fully started
* @return boolean true if wait succeeded false if broker was not started or was stopped

View File

@ -35,6 +35,7 @@ import util.TimeMetric
import scala.Some
import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
import org.apache.activemq.broker.SuppressReplyException
case class EntryLocator(qid:Long, seq:Long)
case class DataLocator(store:LevelDBStore, pos:Long, len:Int) {
@ -569,9 +570,6 @@ class DBManager(val parent:LevelDBStore) {
def drainFlushes:Unit = {
dispatchQueue.assertExecuting()
if( !started ) {
return
}
// Some UOWs may have been canceled.
import collection.JavaConversions._
@ -590,7 +588,12 @@ class DBManager(val parent:LevelDBStore) {
assert(action!=null)
}
}
Some(uow)
if( !started ) {
uow.onCompleted(new SuppressReplyException("Store stopped"))
None
} else {
Some(uow)
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import javax.jms.*;
@ -30,10 +31,17 @@ import javax.management.openmbean.CompositeData;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import static org.junit.Assert.*;
@ -82,6 +90,165 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
}
}
public interface Client{
public void execute(Connection connection) throws Exception;
}
protected Thread startFailoverClient(String name, final Client client) throws IOException, URISyntaxException {
String url = "failover://(tcp://localhost:"+port+")?maxReconnectDelay=500&nested.wireFormat.maxInactivityDuration=1000";
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Thread rc = new Thread(name) {
@Override
public void run() {
Connection connection = null;
try {
connection = factory.createConnection();
client.execute(connection);
} catch (Throwable e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
}
}
}
};
rc.start();
return rc;
}
@Test
@Ignore
public void testReplicationQuorumLoss() throws Throwable {
System.out.println("======================================");
System.out.println(" Start 2 ActiveMQ nodes.");
System.out.println("======================================");
startBrokerAsync(createBrokerNode("node-1", port));
startBrokerAsync(createBrokerNode("node-2", port));
BrokerService master = waitForNextMaster();
System.out.println("======================================");
System.out.println(" Start the producer and consumer");
System.out.println("======================================");
final AtomicBoolean stopClients = new AtomicBoolean(false);
final ArrayBlockingQueue<String> errors = new ArrayBlockingQueue<String>(100);
final AtomicLong receivedCounter = new AtomicLong();
final AtomicLong sentCounter = new AtomicLong();
Thread producer = startFailoverClient("producer", new Client() {
@Override
public void execute(Connection connection) throws Exception {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("test"));
long actual = 0;
while(!stopClients.get()) {
TextMessage msg = session.createTextMessage("Hello World");
msg.setLongProperty("id", actual++);
producer.send(msg);
sentCounter.incrementAndGet();
}
}
});
Thread consumer = startFailoverClient("consumer", new Client() {
@Override
public void execute(Connection connection) throws Exception {
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("test"));
long expected = 0;
while(!stopClients.get()) {
Message msg = consumer.receive(200);
if( msg!=null ) {
long actual = msg.getLongProperty("id");
if( actual != expected ) {
errors.offer("Received got unexpected msg id: "+actual+", expected: "+expected);
}
msg.acknowledge();
expected = actual+1;
receivedCounter.incrementAndGet();
}
}
}
});
try {
assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS);
assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS);
assertNull(errors.poll());
System.out.println("======================================");
System.out.println(" Master should stop once the quorum is lost.");
System.out.println("======================================");
ArrayList<BrokerService> stopped = stopSlaves();// stopping the slaves should kill the quorum.
assertStopsWithin(master, 10, TimeUnit.SECONDS);
assertNull(errors.poll()); // clients should not see an error since they are failover clients.
stopped.add(master);
System.out.println("======================================");
System.out.println(" Restart the slave. Clients should make progress again..");
System.out.println("======================================");
startBrokersAsync(createBrokerNodes(stopped));
assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS);
assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS);
assertNull(errors.poll());
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
// Wait for the clients to stop..
stopClients.set(true);
producer.join();
consumer.join();
}
}
protected void startBrokersAsync(ArrayList<BrokerService> brokers) {
for (BrokerService broker : brokers) {
startBrokerAsync(broker);
}
}
protected ArrayList<BrokerService> createBrokerNodes(ArrayList<BrokerService> brokers) throws Exception {
ArrayList<BrokerService> rc = new ArrayList<BrokerService>();
for (BrokerService b : brokers) {
rc.add(createBrokerNode(b.getBrokerName(), connectPort(b)));
}
return rc;
}
protected ArrayList<BrokerService> stopSlaves() throws Exception {
ArrayList<BrokerService> rc = new ArrayList<BrokerService>();
for (BrokerService broker : brokers) {
if( broker.isSlave() ) {
System.out.println("Stopping slave: "+broker.getBrokerName());
broker.stop();
broker.waitUntilStopped();
rc.add(broker);
}
}
brokers.removeAll(rc);
return rc;
}
protected void assertStopsWithin(final BrokerService master, int timeout, TimeUnit unit) throws InterruptedException {
within(timeout, unit, new Task(){
@Override
public void run() throws Exception {
assertTrue(master.isStopped());
}
});
}
protected void assertCounterMakesProgress(final AtomicLong counter, int timeout, TimeUnit unit) throws InterruptedException {
final long initial = counter.get();
within(timeout, unit, new Task(){
public void run() throws Exception {
assertTrue(initial < counter.get());
}
});
}
public void testAMQ4837(boolean jmx) throws Throwable {
@ -205,8 +372,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
private ArrayList<String> browseMessagesViaJMS(BrokerService brokerService) throws Exception {
ArrayList<String> rc = new ArrayList<String>();
TransportConnector connector = brokerService.getTransportConnectors().get(0);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:"+ connectPort(brokerService));
Connection connection = factory.createConnection();
try {
connection.start();
@ -223,6 +389,19 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
return rc;
}
private int connectPort(BrokerService brokerService) throws IOException, URISyntaxException {
TransportConnector connector = brokerService.getTransportConnectors().get(0);
return connector.getConnectUri().getPort();
}
int port;
@Before
public void findFreePort() throws Exception {
ServerSocket socket = new ServerSocket(0);
port = socket.getLocalPort();
socket.close();
}
@After
public void stopBrokers() throws Exception {
for (BrokerService broker : brokers) {
@ -235,12 +414,18 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
}
private BrokerService createBrokerNode(String id) throws Exception {
return createBrokerNode(id, 0);
}
private BrokerService createBrokerNode(String id, int port) throws Exception {
BrokerService bs = new BrokerService();
bs.getManagementContext().setCreateConnector(false);
brokers.add(bs);
bs.setBrokerName(id);
bs.setPersistenceAdapter(createStoreNode(id));
bs.addConnector("tcp://0.0.0.0:0");
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://0.0.0.0:" + port));
bs.addConnector(connector);
return bs;
}