Expose the replicated store status via JMX.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1497843 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-06-28 16:57:29 +00:00
parent 4834cf15ec
commit 16cd8c3954
4 changed files with 206 additions and 9 deletions

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.replicated;
import org.apache.activemq.broker.jmx.MBeanInfo;
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface ReplicatedLevelDBStoreViewMBean {
@MBeanInfo("The address of the ZooKeeper server.")
String getZkAddress();
@MBeanInfo("The path in ZooKeeper to hold master elections.")
String getZkPath();
@MBeanInfo("The ZooKeeper session timeout.")
String getZkSessionTmeout();
@MBeanInfo("The address and port the master will bind for the replication protocol.")
String getBind();
@MBeanInfo("The number of replication nodes that will be part of the replication cluster.")
int getReplicas();
@MBeanInfo("The role of this node in the replication cluster.")
String getNodeRole();
@MBeanInfo("The replication status.")
String getStatus();
@MBeanInfo("The current position of the replication log.")
Long getPosition();
@MBeanInfo("The directory holding the data.")
String getDirectory();
@MBeanInfo("The size the log files are allowed to grow to.")
long getLogSize();
@MBeanInfo("The implementation of the LevelDB index being used.")
String getIndexFactory();
@MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
boolean getVerifyChecksums();
@MBeanInfo("The maximum number of open files the index will open at one time.")
int getIndexMaxOpenFiles();
@MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
int getIndexBlockRestartInterval();
@MBeanInfo("Do aggressive checking of store data")
boolean getParanoidChecks();
@MBeanInfo("Amount of data to build up in memory for the index before converting to a sorted on-disk file.")
int getIndexWriteBufferSize();
@MBeanInfo("Approximate size of user data packed per block for the index")
int getIndexBlockSize();
@MBeanInfo("The type of compression to use for the index")
String getIndexCompression();
@MBeanInfo("The size of the cache index")
long getIndexCacheSize();
@MBeanInfo("The maximum amount of async writes to buffer up")
int getAsyncBufferSize();
@MBeanInfo("The sync strategy to use.")
String getSync();
}

View File

@ -20,8 +20,8 @@ import org.fusesource.fabric.groups._
import org.fusesource.fabric.zookeeper.internal.ZKClient
import org.linkedin.util.clock.Timespan
import scala.reflect.BeanProperty
import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore}
import java.net.{NetworkInterface, InetAddress}
import org.fusesource.hawtdispatch._
import org.apache.activemq.broker.Locker
@ -32,6 +32,10 @@ import org.apache.activemq.leveldb.util.Log
import java.io.File
import org.apache.activemq.usage.SystemUsage
import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.leveldb.LevelDBStore._
import javax.management.ObjectName
object ElectingLevelDBStore extends Log {
@ -141,6 +145,16 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
def init() {
if(brokerService!=null){
try {
AnnotatedMBean.registerMBean(brokerService.getManagementContext, new ReplicatedLevelDBStoreView(this), objectName)
} catch {
case e: Throwable => {
warn(e, "PersistenceAdapterReplication could not be registered in JMX: " + e.getMessage)
}
}
}
// Figure out our position in the store.
directory.mkdirs()
val log = new RecordLog(directory, LevelDBClient.LOG_SUFFIX)
@ -217,11 +231,21 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
})
}
def objectName = {
var objectNameStr = brokerService.getBrokerObjectName.toString;
objectNameStr += "," + "Service=PersistenceAdapterReplication";
objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart("LevelDB[" + directory.getAbsolutePath + "]");
new ObjectName(objectNameStr);
}
protected def doStart() = {
master_started_latch.await()
}
protected def doStop(stopper: ServiceStopper) {
if(brokerService!=null){
brokerService.getManagementContext().unregisterMBean(objectName);
}
zk_client.close()
zk_client = null
if( master_started.get() ) {
@ -327,3 +351,59 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
}
}
}
class ReplicatedLevelDBStoreView(val store:ElectingLevelDBStore) extends ReplicatedLevelDBStoreViewMBean {
import store._
def getZkAddress = zkAddress
def getZkPath = zkPath
def getZkSessionTmeout = zkSessionTmeout
def getBind = bind
def getReplicas = replicas
def getNodeRole:String = {
if( slave!=null ) {
return "slave"
}
if( master!=null ) {
return "master"
}
"electing"
}
def getStatus:String = {
if( slave!=null ) {
return slave.status
}
if( master!=null ) {
return master.status
}
""
}
def getPosition:java.lang.Long = {
if( slave!=null ) {
return new java.lang.Long(slave.wal_append_position)
}
if( master!=null ) {
return new java.lang.Long(master.wal_append_position)
}
null
}
def getAsyncBufferSize = asyncBufferSize
def getDirectory = directory.getCanonicalPath
def getIndexBlockRestartInterval = indexBlockRestartInterval
def getIndexBlockSize = indexBlockSize
def getIndexCacheSize = indexCacheSize
def getIndexCompression = indexCompression
def getIndexFactory = indexFactory
def getIndexMaxOpenFiles = indexMaxOpenFiles
def getIndexWriteBufferSize = indexWriteBufferSize
def getLogSize = logSize
def getParanoidChecks = paranoidChecks
def getSync = sync
def getVerifyChecksums = verifyChecksums
}

View File

@ -80,6 +80,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val slaves = new ConcurrentHashMap[String,SlaveState]()
def status = slaves.values().map(_.status).mkString(", ")
override def doStart = {
unstash(directory)
super.doStart

View File

@ -51,6 +51,8 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
var wal_session:Session = _
var transfer_session:Session = _
var status = "initialized"
override def doStart() = {
client.init()
if (purgeOnStatup) {
@ -90,14 +92,16 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
info("Connecting to master: "+connect)
status = "Connecting to master: "+connect
info(status)
wal_session = new Session(transport, (session)=>{
// lets stash away our current state so that we can unstash it
// in case we don't get caught up.. If the master dies,
// the stashed data might be the best option to become the master.
stash(directory)
delete_store(directory)
debug("Connected to master. Syncing")
status = "Connected to master. Syncing"
debug(status)
session.request_then(SYNC_ACTION, null) { body =>
val response = JsonCodec.decode(body, classOf[SyncResponse])
transfer_missing(response)
@ -266,9 +270,22 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
info("Connecting catchup session...")
status = "Connecting catchup session."
info(status)
transfer_session = new Session(transport, (session)=> {
info("Catchup session connected...")
var total_files = 0
var total_size = 0L
var downloaded_size = 0L
var downloaded_files = 0
def update_download_status = {
status = "Slave catching up. Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f, total_size/1024f, downloaded_files, total_files)
info(status)
}
status = "Catchup session connected..."
info(status)
// Transfer the log files..
var append_offset = 0L
@ -322,11 +339,15 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
transfer.offset = 0
transfer.length = x.length
debug("Slave requested: "+transfer.file)
total_size += x.length
total_files += 1
session.request_then(GET_ACTION, transfer) { body =>
val buffer = map(target_file, 0, x.length, false)
session.codec.readData(buffer, ^{
unmap(buffer)
info("Slave downloaded: "+transfer.file+" ("+x.length+" bytes)")
downloaded_size += x.length
downloaded_files += 1
update_download_status
})
}
}
@ -342,18 +363,23 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
transfer.offset = 0
transfer.length = x.length
info("Slave requested: "+transfer.file)
total_size += x.length
total_files += 1
session.request_then(GET_ACTION, transfer) { body =>
val buffer = map(dirty_index / x.file, 0, x.length, false)
session.codec.readData(buffer, ^{
unmap(buffer)
info("Slave downloaded: "+transfer.file+" ("+x.length+" bytes)")
downloaded_size += x.length
downloaded_files += 1
update_download_status
})
}
}
session.request_then(DISCONNECT_ACTION, null) { body =>
// Ok we are now caught up.
info("Slave has now caught up")
status = "Synchronize"
info(status)
stash_clear(directory) // we don't need the stash anymore.
transport.stop(NOOP)
transfer_session = null