Simplify and improve the leveldb replication MBean

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1499754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-07-04 13:47:27 +00:00
parent 8302262e2e
commit 2019a21d96
8 changed files with 119 additions and 97 deletions

View File

@ -49,7 +49,7 @@ import java.util.Set;
public final class OpenTypeSupport {
interface OpenTypeFactory {
public interface OpenTypeFactory {
CompositeType getCompositeType() throws OpenDataException;
Map<String, Object> getFields(Object o) throws OpenDataException;
@ -57,7 +57,7 @@ public final class OpenTypeSupport {
private static final Map<Class, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, AbstractOpenTypeFactory>();
abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
private CompositeType compositeType;
private final List<String> itemNamesList = new ArrayList<String>();

View File

@ -4,6 +4,24 @@ import scala.reflect.BeanProperty
import java.util.UUID
import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.leveldb.util.FileSupport._
import java.io.File
object ReplicatedLevelDBStoreTrait {
def create_uuid = UUID.randomUUID().toString
def node_id(directory:File):String = {
val nodeid_file = directory / "nodeid.txt"
if( nodeid_file.exists() ) {
nodeid_file.readText()
} else {
val rc = create_uuid
nodeid_file.getParentFile.mkdirs()
nodeid_file.writeText(rc)
rc
}
}
}
/**
*/
@ -12,19 +30,7 @@ trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
@BeanProperty
var securityToken = ""
def replicaId:String = {
val replicaid_file = directory / "replicaid.txt"
if( replicaid_file.exists() ) {
replicaid_file.readText()
} else {
val rc = create_uuid
replicaid_file.getParentFile.mkdirs()
replicaid_file.writeText(rc)
rc
}
}
def create_uuid = UUID.randomUUID().toString
def node_id = ReplicatedLevelDBStoreTrait.node_id(directory)
def storeId:String = {
val storeid_file = directory / "storeid.txt"

View File

@ -19,6 +19,8 @@ package org.apache.activemq.leveldb.replicated;
import org.apache.activemq.broker.jmx.MBeanInfo;
import javax.management.openmbean.CompositeData;
/**
* <p>
* </p>
@ -44,46 +46,18 @@ public interface ReplicatedLevelDBStoreViewMBean {
@MBeanInfo("The replication status.")
String getStatus();
@MBeanInfo("The status of the connected slaves.")
CompositeData[] getSlaves();
@MBeanInfo("The current position of the replication log.")
Long getPosition();
@MBeanInfo("The directory holding the data.")
String getDirectory();
@MBeanInfo("The size the log files are allowed to grow to.")
long getLogSize();
@MBeanInfo("The implementation of the LevelDB index being used.")
String getIndexFactory();
@MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
boolean getVerifyChecksums();
@MBeanInfo("The maximum number of open files the index will open at one time.")
int getIndexMaxOpenFiles();
@MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
int getIndexBlockRestartInterval();
@MBeanInfo("Do aggressive checking of store data")
boolean getParanoidChecks();
@MBeanInfo("Amount of data to build up in memory for the index before converting to a sorted on-disk file.")
int getIndexWriteBufferSize();
@MBeanInfo("Approximate size of user data packed per block for the index")
int getIndexBlockSize();
@MBeanInfo("The type of compression to use for the index")
String getIndexCompression();
@MBeanInfo("The size of the cache index")
long getIndexCacheSize();
@MBeanInfo("The maximum amount of async writes to buffer up")
int getAsyncBufferSize();
@MBeanInfo("The sync strategy to use.")
String getSync();
@MBeanInfo("The node id of this replication node.")
String getNodeId();
}

View File

@ -32,8 +32,8 @@ import javax.xml.bind.annotation.XmlRootElement;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Login {
@XmlAttribute(name="slave_id")
public String slave_id;
@XmlAttribute(name="node_id")
public String node_id;
@XmlAttribute(name="security_token")
public String security_token;

View File

@ -33,9 +33,11 @@ import java.io.File
import org.apache.activemq.usage.SystemUsage
import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.broker.jmx.{OpenTypeSupport, BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.leveldb.LevelDBStore._
import javax.management.ObjectName
import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
import java.util
object ElectingLevelDBStore extends Log {
@ -80,7 +82,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
var replicas = 2
var replicas = 3
@BeanProperty
var sync="quorum_mem"
@ -143,6 +145,8 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
this.usageManager = usageManager
}
def node_id = ReplicatedLevelDBStoreTrait.node_id(directory)
def init() {
if(brokerService!=null){
@ -234,9 +238,8 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
}
def objectName = {
var objectNameStr = brokerService.getBrokerObjectName.toString;
objectNameStr += "," + "Service=PersistenceAdapterReplication";
objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart("LevelDB[" + directory.getAbsolutePath + "]");
var objectNameStr = BrokerMBeanSupport.createPersistenceAdapterName(brokerService.getBrokerObjectName.toString, "LevelDB[" + directory.getAbsolutePath + "]").toString
objectNameStr += "," + "view=Replication";
new ObjectName(objectNameStr);
}
@ -384,6 +387,39 @@ class ReplicatedLevelDBStoreView(val store:ElectingLevelDBStore) extends Replica
""
}
object SlaveStatusOTF extends OpenTypeSupport.AbstractOpenTypeFactory {
protected def getTypeName: String = classOf[SlaveStatus].getName
protected override def init() = {
super.init();
addItem("nodeId", "nodeId", SimpleType.STRING);
addItem("remoteAddress", "remoteAddress", SimpleType.STRING);
addItem("attached", "attached", SimpleType.BOOLEAN);
addItem("position", "position", SimpleType.LONG);
}
override def getFields(o: Any): util.Map[String, AnyRef] = {
val status = o.asInstanceOf[SlaveStatus]
val rc = super.getFields(o);
rc.put("nodeId", status.nodeId);
rc.put("remoteAddress", status.remoteAddress);
rc.put("attached", status.attached.asInstanceOf[java.lang.Boolean]);
rc.put("position", status.position.asInstanceOf[java.lang.Long]);
rc
}
}
def getSlaves():Array[CompositeData] = {
if( master!=null ) {
master.slaves_status.map { status =>
val fields = SlaveStatusOTF.getFields(status);
new CompositeDataSupport(SlaveStatusOTF.getCompositeType(), fields).asInstanceOf[CompositeData]
}.toArray
} else {
Array()
}
}
def getPosition:java.lang.Long = {
if( slave!=null ) {
return new java.lang.Long(slave.wal_append_position)
@ -394,18 +430,8 @@ class ReplicatedLevelDBStoreView(val store:ElectingLevelDBStore) extends Replica
null
}
def getAsyncBufferSize = asyncBufferSize
def getDirectory = directory.getCanonicalPath
def getIndexBlockRestartInterval = indexBlockRestartInterval
def getIndexBlockSize = indexBlockSize
def getIndexCacheSize = indexCacheSize
def getIndexCompression = indexCompression
def getIndexFactory = indexFactory
def getIndexMaxOpenFiles = indexMaxOpenFiles
def getIndexWriteBufferSize = indexWriteBufferSize
def getLogSize = logSize
def getParanoidChecks = paranoidChecks
def getSync = sync
def getVerifyChecksums = verifyChecksums
def getNodeId: String = node_id
}

View File

@ -25,7 +25,7 @@ import org.apache.activemq.leveldb.replicated.dto._
import org.fusesource.hawtdispatch.transport._
import java.util.concurrent._
import java.io.{IOException, File}
import java.net.{InetSocketAddress, URI}
import java.net.{SocketAddress, InetSocketAddress, URI}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.reflect.BeanProperty
@ -40,6 +40,8 @@ object MasterLevelDBStore extends Log {
}
case class SlaveStatus(nodeId:String, remoteAddress:String, attached:Boolean, position:Long)
/**
*/
class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
@ -52,7 +54,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
var replicas = 2
var replicas = 3
def minSlaveAcks = replicas/2
var _syncTo="quorum_mem"
@ -80,7 +82,27 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val slaves = new ConcurrentHashMap[String,SlaveState]()
def status = slaves.values().map(_.status).mkString(", ")
def slaves_status = slaves.values().map(_.status)
def status = {
var caughtUpCounter = 0
var notCaughtUpCounter = 0
for( slave <- slaves.values() ) {
if( slave.isCaughtUp ) {
caughtUpCounter += 1
} else {
notCaughtUpCounter += 1
}
}
var rc = ""
if( notCaughtUpCounter > 0 ) {
rc += "%d slave nodes attaching. ".format(notCaughtUpCounter)
}
if( caughtUpCounter > 0 ) {
rc += "%d slave nodes attached. ".format(caughtUpCounter)
}
rc
}
override def doStart = {
unstash(directory)
@ -201,10 +223,10 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
return;
}
debug("handle_sync")
slave_state = slaves.get(login.slave_id)
slave_state = slaves.get(login.node_id)
if ( slave_state == null ) {
slave_state = new SlaveState(login.slave_id)
slaves.put(login.slave_id, slave_state)
slave_state = new SlaveState(login.node_id)
slaves.put(login.node_id, slave_state)
}
slave_state.start(Session.this)
}
@ -253,9 +275,11 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var session:Session = _
var position = new AtomicLong(0)
var caughtUp = new AtomicBoolean(false)
var socketAddress:SocketAddress = _
def start(session:Session) = {
debug("SlaveState:start")
socketAddress = session.transport.getRemoteAddress
val resp = this.synchronized {
if( this.session!=null ) {
@ -298,13 +322,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
def position_update(position:Long) = {
val was = this.position.getAndSet(position)
if( was == 0 ) {
info("Slave has finished state transfer: "+slave_id)
this.synchronized {
this.held_snapshot = None
}
}
this.position.getAndSet(position)
check_position_sync
}
@ -316,6 +334,9 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
if( position.get >= p.position ) {
if( caughtUp.compareAndSet(false, true) ) {
info("Slave has now caught up: "+slave_id)
this.synchronized {
this.held_snapshot = None
}
}
p.countDown
last_position_sync = p
@ -323,9 +344,9 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
}
def status = {
"{slave: "+slave_id+", position: "+position.get()+"}"
}
def isCaughtUp = caughtUp.get()
def status = SlaveStatus(slave_id, socketAddress.toString, isCaughtUp, position.get())
}
@volatile
@ -350,8 +371,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
if( isStopped ) {
throw new IllegalStateException("Store replication stopped")
}
val status = slaves.values().map(_.status).mkString(", ")
warn("Store update waiting on %d replica(s) to catch up to log position %d. Connected slaves: [%s]", minSlaveAcks, position, status)
warn("Store update waiting on %d replica(s) to catch up to log position %d. %s", minSlaveAcks, position, status)
}
}

View File

@ -92,7 +92,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
status = "Connecting to master: "+connect
status = "Attaching to master: "+connect
info(status)
wal_session = new Session(transport, (session)=>{
// lets stash away our current state so that we can unstash it
@ -100,8 +100,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
// the stashed data might be the best option to become the master.
stash(directory)
delete_store(directory)
status = "Connected to master. Syncing"
debug(status)
debug("Log replicaiton session connected")
session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse])
transfer_missing(response)
@ -206,7 +205,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
super.onTransportConnected
val login = new Login
login.security_token = securityToken
login.slave_id = replicaId
login.node_id = node_id
request_then(LOGIN_ACTION, login) { body =>
on_login(Session.this)
}
@ -270,8 +269,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
status = "Connecting catchup session."
info(status)
debug("Connecting download session.")
transfer_session = new Session(transport, (session)=> {
var total_files = 0
@ -280,12 +278,11 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var downloaded_files = 0
def update_download_status = {
status = "Slave catching up. Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
status = "Attaching... Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
info(status)
}
status = "Catchup session connected..."
info(status)
debug("Download session connected...")
// Transfer the log files..
var append_offset = 0L
@ -378,7 +375,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
session.request_then(DISCONNECT_ACTION, null) { body =>
// Ok we are now caught up.
status = "Synchronized"
status = "Attached"
info(status)
stash_clear(directory) // we don't need the stash anymore.
transport.stop(NOOP)

View File

@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
@ -161,9 +160,9 @@ public class ReplicatedLevelDBStoreTest {
LOG.info("Checking master state");
assertEquals(expected_list, getMessages(ms));
LOG.info("Stopping master: " + master.replicaId());
LOG.info("Stopping master: " + master.node_id());
master.stop();
LOG.info("Stopping slave: " + slave1.replicaId());
LOG.info("Stopping slave: " + slave1.node_id());
slave1.stop();
// Rotate the dir order so that slave1 becomes the master next.