Add more replication tests.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1476786 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-04-28 15:03:24 +00:00
parent 5c6b8ba11f
commit ca814802ca
3 changed files with 102 additions and 38 deletions

View File

@ -20,7 +20,6 @@ import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.util.ServiceStopper import org.apache.activemq.util.ServiceStopper
import org.apache.activemq.leveldb.util.FileSupport._ import org.apache.activemq.leveldb.util.FileSupport._
import org.apache.activemq.leveldb.util.{JsonCodec, Log} import org.apache.activemq.leveldb.util.{JsonCodec, Log}
import java.util
import org.fusesource.hawtdispatch._ import org.fusesource.hawtdispatch._
import org.apache.activemq.leveldb.replicated.dto._ import org.apache.activemq.leveldb.replicated.dto._
import org.fusesource.hawtdispatch.transport._ import org.fusesource.hawtdispatch.transport._
@ -191,7 +190,7 @@ class MasterLevelDBStore extends LevelDBStore {
sendError("Not logged in") sendError("Not logged in")
return; return;
} }
info("handle_sync") debug("handle_sync")
slave_state = slaves.get(login.slave_id) slave_state = slaves.get(login.slave_id)
if ( slave_state == null ) { if ( slave_state == null ) {
slave_state = new SlaveState(login.slave_id) slave_state = new SlaveState(login.slave_id)
@ -245,7 +244,7 @@ class MasterLevelDBStore extends LevelDBStore {
var position = new AtomicLong(0) var position = new AtomicLong(0)
def start(session:Session) = { def start(session:Session) = {
info("SlaveState:start") debug("SlaveState:start")
val resp = this.synchronized { val resp = this.synchronized {
if( this.session!=null ) { if( this.session!=null ) {
@ -309,6 +308,10 @@ class MasterLevelDBStore extends LevelDBStore {
} }
} }
} }
def status = {
"{slave: "+slave_id+", position: "+position.get()+"}"
}
} }
@volatile @volatile
@ -323,11 +326,10 @@ class MasterLevelDBStore extends LevelDBStore {
for( slave <- slaves.values() ) { for( slave <- slaves.values() ) {
slave.check_position_sync slave.check_position_sync
} }
while( !position_sync.await(1, TimeUnit.SECONDS) ) { while( !position_sync.await(1, TimeUnit.SECONDS) ) {
println("Waiting for slaves to ack log position: "+position_sync.position) val status = slaves.values().map(_.status).mkString(", ")
for( slave <- slaves.values() ) { warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minReplica, position, status)
slave.check_position_sync
}
} }
} }

View File

@ -87,7 +87,7 @@ class SlaveLevelDBStore extends LevelDBStore {
info("Connecting to master...") info("Connecting to master...")
wal_session = new Session(transport, (session)=>{ wal_session = new Session(transport, (session)=>{
info("Connected to master. Syncing") debug("Connected to master. Syncing")
session.request_then(SYNC_ACTION, null) { body => session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse]) val response = JsonCodec.decode(body, classOf[SyncResponse])
transfer_missing(response) transfer_missing(response)

View File

@ -22,6 +22,8 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.leveldb.CountDownFuture;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.leveldb.replicated.MasterLevelDBStore; import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore; import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
import org.apache.activemq.leveldb.util.FileSupport; import org.apache.activemq.leveldb.util.FileSupport;
@ -34,15 +36,79 @@ import javax.jms.JMSException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
*/ */
public class ReplicatedLevelDBStoreTest extends TestCase { public class ReplicatedLevelDBStoreTest extends TestCase {
protected static final Logger LOG = LoggerFactory.getLogger(ReplicatedLevelDBStoreTest.class); protected static final Logger LOG = LoggerFactory.getLogger(ReplicatedLevelDBStoreTest.class);
public void testMinReplicaEnforced() throws Exception {
File masterDir = new File("target/activemq-data/leveldb-node1");
File slaveDir = new File("target/activemq-data/leveldb-node2");
FileSupport.toRichFile(masterDir).recursiveDelete();
FileSupport.toRichFile(slaveDir).recursiveDelete();
MasterLevelDBStore master = createMaster(masterDir);
master.setMinReplica(1);
master.start();
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
// Updating the store should not complete since we don't have enough
// replicas.
CountDownFuture f = asyncAddMessage(ms, "m1");
assertFalse(f.completed().await(2, TimeUnit.SECONDS));
// Adding a slave should allow that update to complete.
SlaveLevelDBStore slave = createSlave(master, slaveDir);
slave.start();
assertTrue(f.completed().await(2, TimeUnit.SECONDS));
// New updates should complete quickly now..
f = asyncAddMessage(ms, "m2");
assertTrue(f.completed().await(1, TimeUnit.SECONDS));
// If the slave goes offline, then updates should once again
// not complete.
slave.stop();
f = asyncAddMessage(ms, "m3");
assertFalse(f.completed().await(2, TimeUnit.SECONDS));
// Restart and the op should complete.
slave = createSlave(master, slaveDir);
slave.start();
assertTrue(f.completed().await(2, TimeUnit.SECONDS));
master.stop();
slave.stop();
}
private CountDownFuture asyncAddMessage(final MessageStore ms, final String body) {
final CountDownFuture f = new CountDownFuture(new CountDownLatch(1));
LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
public void run() {
try {
addMessage(ms, body);
} catch (Exception e) {
e.printStackTrace();
} finally {
f.countDown();
}
}
});
return f;
}
public void testReplication() throws Exception { public void testReplication() throws Exception {
LinkedList<File> directories = new LinkedList<File>(); LinkedList<File> directories = new LinkedList<File>();
directories.add(new File("target/activemq-data/leveldb-node1")); directories.add(new File("target/activemq-data/leveldb-node1"));
directories.add(new File("target/activemq-data/leveldb-node2")); directories.add(new File("target/activemq-data/leveldb-node2"));
@ -53,32 +119,14 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
} }
ArrayList<String> expected_list = new ArrayList<String>(); ArrayList<String> expected_list = new ArrayList<String>();
final int LOG_SIZE = 1023*200;
// We will rotate between 3 nodes the task of being the master. // We will rotate between 3 nodes the task of being the master.
for( int j=0; j < 10; j++) { for( int j=0; j < 10; j++) {
MasterLevelDBStore master = new MasterLevelDBStore(); MasterLevelDBStore master = createMaster(directories.get(0));
master.setDirectory(directories.get(0));
master.setBind("tcp://0.0.0.0:0");
master.setSecurityToken("foo");
master.setMinReplica(1);
master.setLogSize(LOG_SIZE);
LOG.info("Starting master: "+master.replicaId());
master.start(); master.start();
SlaveLevelDBStore slave1 = createSlave(master, directories.get(1));
SlaveLevelDBStore slave1 = new SlaveLevelDBStore(); SlaveLevelDBStore slave2 = createSlave(master, directories.get(2));
slave1.setDirectory(directories.get(1)); slave2.start();
slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
slave1.setSecurityToken("foo");
slave1.setLogSize(LOG_SIZE);
LOG.info("Starting slave: "+slave1.replicaId());
slave1.start();
SlaveLevelDBStore slave2 = new SlaveLevelDBStore();
slave2.setDirectory(directories.get(2));
slave2.setConnect("tcp://127.0.0.1:" + master.getPort());
slave2.setSecurityToken("foo");
slave2.setLogSize(LOG_SIZE);
LOG.info("Adding messages..."); LOG.info("Adding messages...");
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST")); MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
@ -88,13 +136,8 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
LOG.info("" + (100*i/TOTAL) + "% done"); LOG.info("" + (100*i/TOTAL) + "% done");
} }
if( i == 100 ) { if( i == 250 ) {
LOG.info("Starting slave: "+slave2.replicaId()); slave1.start();
slave2.start();
}
if( i == 200 ) {
LOG.info("Stopping slave: "+slave2.replicaId());
slave2.stop(); slave2.stop();
} }
@ -116,6 +159,25 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
} }
} }
private SlaveLevelDBStore createSlave(MasterLevelDBStore master, File directory) {
SlaveLevelDBStore slave1 = new SlaveLevelDBStore();
slave1.setDirectory(directory);
slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
slave1.setSecurityToken("foo");
slave1.setLogSize(1023*200);
return slave1;
}
private MasterLevelDBStore createMaster(File directory) {
MasterLevelDBStore master = new MasterLevelDBStore();
master.setDirectory(directory);
master.setBind("tcp://0.0.0.0:0");
master.setSecurityToken("foo");
master.setMinReplica(1);
master.setLogSize(1023 * 200);
return master;
}
long id_counter = 0L; long id_counter = 0L;
String payload = ""; String payload = "";
{ {