Fix for: https://issues.apache.org/jira/browse/AMQ-4837 : LevelDB corrupted in AMQ cluster.

- Log rotation was causing a pre-mature index snapshot to be taken on the slave (snapshot while the slave was still synchronizing).
- Also fix issue with the append position displayed in JMX for the master not being correct.
This commit is contained in:
Hiram Chirino 2013-10-31 12:18:48 -04:00
parent 7dfb0a2a3c
commit 24d5490e57
4 changed files with 161 additions and 50 deletions

View File

@ -147,6 +147,12 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<!-- For Optional Snappy Compression -->
<dependency>
<groupId>org.xerial.snappy</groupId>

View File

@ -625,11 +625,15 @@ class LevelDBClient(store: LevelDBStore) {
log = createLog
log.logSize = store.logSize
log.on_log_rotate = ()=> {
post_log_rotate
}
}
def post_log_rotate ={
// We snapshot the index every time we rotate the logs.
writeExecutor {
snapshotIndex(false)
}
}
}
def replay_init() = {
@ -927,7 +931,16 @@ class LevelDBClient(store: LevelDBStore) {
}
}
var wal_append_position = 0L
var stored_wal_append_position = 0L
def wal_append_position = this.synchronized {
if (log!=null && log.isOpen) {
log.appender_limit
} else {
stored_wal_append_position
}
}
def stop() = this.synchronized {
if( writeExecutor!=null ) {
@ -948,7 +961,7 @@ class LevelDBClient(store: LevelDBStore) {
if (log!=null && log.isOpen) {
log.close
copyDirtyIndexToSnapshot
wal_append_position = log.appender_limit
stored_wal_append_position = log.appender_limit
log = null
}
if( plist!=null ) {

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.leveldb.replicated
import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore}
import org.apache.activemq.util.ServiceStopper
import java.util
import org.fusesource.hawtdispatch._
@ -53,6 +53,16 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var status = "initialized"
override def createClient = new LevelDBClient(this) {
// We don't want to start doing index snapshots until
// he slave is caught up.
override def post_log_rotate: Unit = {
if( caughtUp ) {
super.post_log_rotate
}
}
}
override def doStart() = {
client.init()
if (purgeOnStatup) {
@ -100,7 +110,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
// the stashed data might be the best option to become the master.
stash(directory)
delete_store(directory)
debug("Log replicaiton session connected")
debug("Log replication session connected")
session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse])
transfer_missing(response)
@ -165,7 +175,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
command.action match {
case WAL_ACTION =>
val value = JsonCodec.decode(command.body, classOf[LogWrite])
if( caughtUp && value.offset ==0 ) {
if( caughtUp && value.offset ==0 && value.file!=0 ) {
client.log.rotate
}
val file = client.log.next_log(value.file)

View File

@ -21,17 +21,22 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.*;
/**
* Holds broker unit tests of the replicated leveldb store.
@ -46,50 +51,109 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
* https://issues.apache.org/jira/browse/AMQ-4837
*/
@Test(timeout = 1000*60*10)
public void testAMQ4837() throws Exception {
public void testAMQ4837viaJMS() throws Throwable {
testAMQ4837(false);
}
// 1. Start 3 activemq nodes.
startBrokerAsync(createBrokerNode("node-1"));
startBrokerAsync(createBrokerNode("node-2"));
startBrokerAsync(createBrokerNode("node-3"));
/**
* Tries to replicate the problem reported at:
* https://issues.apache.org/jira/browse/AMQ-4837
*/
@Test(timeout = 1000*60*10)
public void testAMQ4837viaJMX() throws Throwable {
for (int i = 0; i < 2; i++) {
resetDataDirs();
testAMQ4837(true);
stopBrokers();
}
}
// 2. Push a message to the master and browse the queue
@Before
public void resetDataDirs() throws IOException {
deleteDirectory("node-1");
deleteDirectory("node-2");
deleteDirectory("node-3");
}
protected void deleteDirectory(String s) throws IOException {
try {
FileUtils.deleteDirectory(new File(data_dir(), s));
} catch (IOException e) {
}
}
public void testAMQ4837(boolean jmx) throws Throwable {
try {
System.out.println("======================================");
System.out.println("1. Start 3 activemq nodes.");
System.out.println("======================================");
startBrokerAsync(createBrokerNode("node-1"));
startBrokerAsync(createBrokerNode("node-2"));
startBrokerAsync(createBrokerNode("node-3"));
BrokerService master = waitForNextMaster();
System.out.println("======================================");
System.out.println("2. Push a message to the master and browse the queue");
System.out.println("======================================");
sendMessage(master, pad("Hello World #1", 1024));
assertEquals(1, browseMessages(master, jmx).size());
System.out.println("======================================");
System.out.println("3. Stop master node");
System.out.println("======================================");
stop(master);
BrokerService prevMaster = master;
master = waitForNextMaster();
System.out.println("======================================");
System.out.println("4. Push a message to the new master and browse the queue. Message summary and queue content ok.");
System.out.println("======================================");
assertEquals(1, browseMessages(master, jmx).size());
sendMessage(master, pad("Hello World #2", 1024));
assertEquals(2, browseMessages(master, jmx).size());
System.out.println("======================================");
System.out.println("5. Restart the stopped node & 6. stop current master");
System.out.println("======================================");
prevMaster = createBrokerNode(prevMaster.getBrokerName());
startBrokerAsync(prevMaster);
stop(master);
master = waitForNextMaster();
System.out.println("======================================");
System.out.println("7. Browse the queue on new master");
System.out.println("======================================");
assertEquals(2, browseMessages(master, jmx).size());
} catch (Throwable e) {
e.printStackTrace();
throw e;
}
}
private void stop(BrokerService master) throws Exception {
System.out.println("Stopping "+master.getBrokerName());
master.stop();
master.waitUntilStopped();
}
private BrokerService waitForNextMaster() throws InterruptedException {
System.out.println("Wait for master to start up...");
BrokerService master = masterQueue.poll(60, TimeUnit.SECONDS);
assertNotNull("Master elected", master);
sendMessage(master, "Hello World #1");
assertEquals(1, browseMessages(master).size());
// 3. Stop master node
System.out.println("Stopping master...");
master.stop();
master.waitUntilStopped();
BrokerService prevMaster = master;
// 4. Push a message to the new master (Node2) and browse the queue using the web UI. Message summary and queue content ok.
System.out.println("Wait for new master to start up...");
master = masterQueue.poll(60, TimeUnit.SECONDS);
assertNotNull("Master elected", master);
sendMessage(master, "Hello World #2");
assertEquals(2, browseMessages(master).size());
// 5. Start Node1
System.out.println("Starting previous master...");
prevMaster = createBrokerNode(prevMaster.getBrokerName());
startBrokerAsync(prevMaster);
// 6. Stop master node (Node2)
System.out.println("Stopping master...");
master.stop();
master.waitUntilStopped();
// 7. Browse the queue using the web UI on new master (Node3). Message summary ok however when clicking on the queue, no message details.
// An error (see below) is logged by the master, which attempts a restart.
System.out.println("Wait for new master to start up...");
master = masterQueue.poll(60, TimeUnit.SECONDS);
assertNotNull("Master elected", master);
assertEquals(2, browseMessages(master).size());
assertFalse(master.isSlave());
assertNull("Only one master elected at a time..", masterQueue.peek());
System.out.println("Master started: " + master.getBrokerName());
return master;
}
private String pad(String value, int size) {
while( value.length() < size ) {
value += " ";
}
return value;
}
private void startBrokerAsync(BrokerService b) {
@ -121,8 +185,25 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
connection.close();
}
}
private ArrayList<String> browseMessages(BrokerService brokerService, boolean jmx) throws Exception {
if( jmx ) {
return browseMessagesViaJMX(brokerService);
} else {
return browseMessagesViaJMS(brokerService);
}
}
private ArrayList<String> browseMessages(BrokerService brokerService) throws Exception {
private ArrayList<String> browseMessagesViaJMX(BrokerService brokerService) throws Exception {
ArrayList<String> rc = new ArrayList<String>();
ObjectName on = new ObjectName("org.apache.activemq:type=Broker,brokerName="+brokerService.getBrokerName()+",destinationType=Queue,destinationName=FOO");
CompositeData[] browse = (CompositeData[]) ManagementFactory.getPlatformMBeanServer().invoke(on, "browse", null, null);
for (CompositeData cd : browse) {
rc.add(cd.get("Text").toString()) ;
}
return rc;
}
private ArrayList<String> browseMessagesViaJMS(BrokerService brokerService) throws Exception {
ArrayList<String> rc = new ArrayList<String>();
TransportConnector connector = brokerService.getTransportConnectors().get(0);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri());
@ -143,14 +224,14 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
}
@After
public void closeBrokers() throws Exception {
public void stopBrokers() throws Exception {
for (BrokerService broker : brokers) {
try {
broker.stop();
broker.waitUntilStopped();
stop(broker);
} catch (Exception e) {
}
}
brokers.clear();
}
private BrokerService createBrokerNode(String id) throws Exception {
@ -178,6 +259,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
}
};
store.setDirectory(new File(data_dir(), id));
store.setContainer(id);
store.setReplicas(3);
store.setZkAddress("localhost:" + connector.getLocalPort());
store.setHostname("localhost");