Added an new ElectingLevelDBStore which handles the M/S election bits using ZooKeeper.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1477387 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-04-29 22:33:26 +00:00
parent ca814802ca
commit 1eeae77339
13 changed files with 1012 additions and 154 deletions

View File

@ -124,6 +124,18 @@
<artifactId>fabric-zookeeper</artifactId>
<version>7.2.0.redhat-024</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<version>4.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
<version>4.3.1</version>
<scope>provided</scope>
</dependency>
<!-- For Optional Snappy Compression -->
<dependency>

View File

@ -0,0 +1,44 @@
package org.apache.activemq.leveldb.replicated
import scala.reflect.BeanProperty
import java.util.UUID
import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.leveldb.util.FileSupport._
/**
*/
trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
@BeanProperty
var securityToken = ""
def replicaId:String = {
val replicaid_file = directory / "replicaid.txt"
if( replicaid_file.exists() ) {
replicaid_file.readText()
} else {
val rc = create_uuid
replicaid_file.getParentFile.mkdirs()
replicaid_file.writeText(rc)
rc
}
}
def create_uuid = UUID.randomUUID().toString
def storeId:String = {
val storeid_file = directory / "storeid.txt"
if( storeid_file.exists() ) {
storeid_file.readText()
} else {
null
}
}
def storeId_=(value:String) {
val storeid_file = directory / "storeid.txt"
storeid_file.writeText(value)
}
}

View File

@ -91,25 +91,39 @@ object UowCompleted extends UowState {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
case class CountDownFuture(completed:CountDownLatch=new CountDownLatch(1)) extends java.util.concurrent.Future[Object] {
def countDown = completed.countDown()
class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
private val latch:CountDownLatch=new CountDownLatch(1)
@volatile
var value:T = _
def cancel(mayInterruptIfRunning: Boolean) = false
def isCancelled = false
def completed = latch.getCount()==0
def await() = latch.await()
def await(p1: Long, p2: TimeUnit) = latch.await(p1, p2)
def set(v:T) = {
value = v
latch.countDown()
}
def get() = {
completed.await()
null
latch.await()
value
}
def get(p1: Long, p2: TimeUnit) = {
if(completed.await(p1, p2)) {
null
if(latch.await(p1, p2)) {
value
} else {
throw new TimeoutException
}
}
def isDone = completed.await(0, TimeUnit.SECONDS);
def isDone = latch.await(0, TimeUnit.SECONDS);
}
object UowManagerConstants {
@ -125,7 +139,7 @@ object UowManagerConstants {
import UowManagerConstants._
class DelayableUOW(val manager:DBManager) extends BaseRetained {
val countDownFuture = CountDownFuture()
val countDownFuture = new CountDownFuture[AnyRef]()
var canceled = false;
val uowId:Int = manager.lastUowId.incrementAndGet()
@ -310,7 +324,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val s = size
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s
countDownFuture.countDown
countDownFuture.set(null)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
@ -332,7 +346,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
asyncCapacityUsed = 0
} else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
countDownFuture.countDown
countDownFuture.set(null)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})

View File

@ -748,7 +748,9 @@ class LevelDBClient(store: LevelDBStore) {
loadMap(LOG_REF_INDEX_KEY, logRefs)
loadMap(COLLECTION_META_KEY, collectionMeta)
}
var wal_append_position = 0L
def stop() = {
if( writeExecutor!=null ) {
writeExecutor.shutdown
@ -765,6 +767,7 @@ class LevelDBClient(store: LevelDBStore) {
if (log.isOpen) {
log.close
copyDirtyIndexToSnapshot
wal_append_position = log.appender_limit
}
if( plist!=null ) {
plist.close

View File

@ -17,41 +17,24 @@
package org.apache.activemq.leveldb
import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext}
import org.apache.activemq.command._
import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage
import java.io.File
import java.io.IOException
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import java.util.concurrent.atomic.AtomicLong
import reflect.BeanProperty
import org.apache.activemq.store._
import java.util._
import collection.mutable.ListBuffer
import javax.management.ObjectName
import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.util._
import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
import org.apache.activemq.leveldb.util.{RetrySupport, Log}
import org.apache.activemq.store.PList.PListIterator
import java.lang
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer}
import scala.Some
import org.apache.activemq.leveldb.CountDownFuture
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.DurableSubscription
import scala.Some
import org.apache.activemq.leveldb.CountDownFuture
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.DurableSubscription
import scala.Some
import org.apache.activemq.leveldb.CountDownFuture
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.DurableSubscription
import scala.Some
import org.apache.activemq.leveldb.CountDownFuture
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.DurableSubscription
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
object LevelDBStore extends Log {
val DEFAULT_DIRECTORY = new File("LevelDB");
@ -64,8 +47,8 @@ object LevelDBStore extends Log {
}
})
val DONE = new CountDownFuture();
DONE.countDown
val DONE = new CountDownFuture[AnyRef]();
DONE.set(null)
def toIOException(e: Throwable): IOException = {
if (e.isInstanceOf[ExecutionException]) {
@ -208,7 +191,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
var snappyCompressLogs = false
def doStart: Unit = {
import FileSupport._
snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
debug("starting")
@ -583,7 +565,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
lastSeq.set(db.getLastQueueEntrySeq(key))
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture = {
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
}
@ -606,7 +588,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
waitOn(asyncAddQueueMessage(context, message, delay))
}
def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture = {
def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = {
uow.dequeue(key, id)
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.replicated
import org.fusesource.fabric.groups._
import org.fusesource.fabric.zookeeper.internal.ZKClient
import org.linkedin.util.clock.Timespan
import scala.reflect.BeanProperty
import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
import java.net.{NetworkInterface, InetAddress}
import org.fusesource.hawtdispatch._
import org.apache.activemq.broker.Locker
import org.apache.activemq.store.PersistenceAdapter
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.activemq.leveldb.util.Log
import java.io.File
object ElectingLevelDBStore extends Log {
def machine_hostname: String = {
import collection.JavaConversions._
// Get the host name of the first non loop-back interface..
for (interface <- NetworkInterface.getNetworkInterfaces; if (!interface.isLoopback); inet <- interface.getInetAddresses) {
var address = inet.getHostAddress
var name = inet.getCanonicalHostName
if( address!= name ) {
return name
}
}
// Or else just go the simple route.
return InetAddress.getLocalHost.getCanonicalHostName;
}
}
/**
*
*/
class ElectingLevelDBStore extends ProxyLevelDBStore {
import ElectingLevelDBStore._
def proxy_target = master
@BeanProperty
var zkAddress = "tcp://127.0.0.1:2888"
@BeanProperty
var zkPassword:String = _
@BeanProperty
var zkPath = "/default"
@BeanProperty
var zkSessionTmeout = "2s"
var brokerName: String = _
@BeanProperty
var hostname: String = _
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
var minReplica = 1
@BeanProperty
var securityToken = ""
var directory = LevelDBStore.DEFAULT_DIRECTORY;
override def setDirectory(dir: File) {
directory = dir
}
override def getDirectory: File = {
return directory
}
@BeanProperty
var logSize: Long = 1024 * 1024 * 100
@BeanProperty
var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory"
@BeanProperty
var sync: Boolean = true
@BeanProperty
var verifyChecksums: Boolean = false
@BeanProperty
var indexMaxOpenFiles: Int = 1000
@BeanProperty
var indexBlockRestartInterval: Int = 16
@BeanProperty
var paranoidChecks: Boolean = false
@BeanProperty
var indexWriteBufferSize: Int = 1024 * 1024 * 6
@BeanProperty
var indexBlockSize: Int = 4 * 1024
@BeanProperty
var indexCompression: String = "snappy"
@BeanProperty
var logCompression: String = "none"
@BeanProperty
var indexCacheSize: Long = 1024 * 1024 * 256L
@BeanProperty
var flushDelay = 1000 * 5
@BeanProperty
var asyncBufferSize = 1024 * 1024 * 4
@BeanProperty
var monitorStats = false
def cluster_size_quorum = minReplica + 1
def cluster_size_max = (minReplica << 2) + 1
var master: MasterLevelDBStore = _
var slave: SlaveLevelDBStore = _
var zk_client: ZKClient = _
var zk_group: Group = _
var master_elector: MasterElector = _
var position: Long = -1L
def init() {
// Figure out our position in the store.
directory.mkdirs()
val log = new RecordLog(directory, LevelDBClient.LOG_SUFFIX)
log.logSize = logSize
log.open()
position = try {
log.current_appender.append_position
} finally {
log.close
}
zk_client = new ZKClient(zkAddress, Timespan.parse(zkSessionTmeout), null)
if( zkPassword!=null ) {
zk_client.setPassword(zkPassword)
}
zk_client.start
zk_client.waitForConnected(Timespan.parse("30s"))
val zk_group = ZooKeeperGroupFactory.create(zk_client, zkPath)
val master_elector = new MasterElector(this)
master_elector.start(zk_group)
master_elector.join
this.setUseLock(true)
this.setLocker(createDefaultLocker())
}
def createDefaultLocker(): Locker = new Locker {
def configure(persistenceAdapter: PersistenceAdapter) {}
def setFailIfLocked(failIfLocked: Boolean) {}
def setLockAcquireSleepInterval(lockAcquireSleepInterval: Long) {}
def setName(name: String) {}
def start() = {
master_started_latch.await()
}
def keepAlive(): Boolean = {
master_started.get()
}
def stop() {}
}
val master_started_latch = new CountDownLatch(1)
val master_started = new AtomicBoolean(false)
def start_master(func: (Int) => Unit) = {
assert(master==null)
master = create_master()
master.blocking_executor.execute(^{
master_started.set(true)
master.start();
master_started_latch.countDown()
func(master.getPort)
})
}
def isMaster = master_started.get() && !master_stopped.get()
val stopped_latch = new CountDownLatch(1)
val master_stopped = new AtomicBoolean(false)
def stop_master(func: => Unit) = {
assert(master!=null)
master.blocking_executor.execute(^{
master.stop();
master_stopped.set(true)
position = master.wal_append_position
stopped_latch.countDown()
func
})
}
protected def doStart() = {
master_started_latch.await()
}
protected def doStop(stopper: ServiceStopper) {
zk_client.close()
zk_client = null
if( master_started.get() ) {
stopped_latch.countDown()
}
}
def start_slave(address: String)(func: => Unit) = {
assert(master==null)
slave = create_slave()
slave.connect = address
slave.blocking_executor.execute(^{
slave.start();
func
})
}
def stop_slave(func: => Unit) = {
if( slave!=null ) {
val s = slave
slave = null
s.blocking_executor.execute(^{
s.stop();
position = s.wal_append_position
func
})
}
}
def create_slave() = {
val slave = new SlaveLevelDBStore();
configure(slave)
slave
}
def create_master() = {
val master = new MasterLevelDBStore
configure(master)
master.minReplica = minReplica
master.bind = bind
master
}
override def setBrokerName(brokerName: String): Unit = {
this.brokerName = brokerName
}
def configure(store: ReplicatedLevelDBStoreTrait) {
store.directory = directory
store.indexFactory = indexFactory
store.sync = sync
store.verifyChecksums = verifyChecksums
store.indexMaxOpenFiles = indexMaxOpenFiles
store.indexBlockRestartInterval = indexBlockRestartInterval
store.paranoidChecks = paranoidChecks
store.indexWriteBufferSize = indexWriteBufferSize
store.indexBlockSize = indexBlockSize
store.indexCompression = indexCompression
store.logCompression = logCompression
store.indexCacheSize = indexCacheSize
store.flushDelay = flushDelay
store.asyncBufferSize = asyncBufferSize
store.monitorStats = monitorStats
store.securityToken = securityToken
store.setBrokerName(brokerName)
store.setBrokerService(brokerService)
}
def address(port: Int) = {
if (hostname == null) {
hostname = machine_hostname
}
"tcp://" + hostname + ":" + port
}
}

View File

@ -0,0 +1,205 @@
package org.apache.activemq.leveldb.replicated
import org.fusesource.fabric.groups._
import org.codehaus.jackson.annotate.JsonProperty
import org.apache.activemq.leveldb.util.{Log, JsonCodec}
class LevelDBNodeState extends NodeState {
@JsonProperty
var id: String = _
@JsonProperty
var address: String = _
@JsonProperty
var position: Long = -1
@JsonProperty
var elected: String = _
override def equals(obj: Any): Boolean = {
obj match {
case x:LevelDBNodeState =>
x.id == id &&
x.address == address &&
x.position == position &&
x.elected == elected
case _ => false
}
}
override
def toString = JsonCodec.encode(this).ascii().toString
}
object MasterElector extends Log
/**
*/
class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[LevelDBNodeState](classOf[LevelDBNodeState]) {
import MasterElector._
var last_state: LevelDBNodeState = _
var elected: String = _
var position: Long = -1
var address: String = _
var updating_store = false
var next_connect: String = _
var connected_address: String = _
def join: Unit = this.synchronized {
last_state = create_state
join(last_state)
add(changle_listener)
}
def elector = this
def update: Unit = elector.synchronized {
var next = create_state
if (next != last_state) {
last_state = next
update(next)
}
}
def create_state = {
val rc = new LevelDBNodeState
rc.id = store.brokerName
rc.elected = elected
rc.position = position
rc.address = address
rc
}
object changle_listener extends ChangeListener {
def connected = changed
def disconnected = changed
def changed:Unit = elector.synchronized {
// info(eid+" cluster state changed: "+members)
if (isMaster) {
// We are the master elector, we will choose which node will startup the MasterLevelDBStore
members.get(store.brokerName) match {
case None =>
info("Not enough cluster members connected to elect a new master.")
case Some(members) =>
if (members.size < store.cluster_size_quorum) {
info("Not enough cluster members connected to elect a new master.")
} else {
// If we already elected a master, lets make sure he is still online..
if (elected != null) {
val by_eid = Map(members: _*)
if (by_eid.get(elected).isEmpty) {
info("Previously elected master is not online, staring new election")
elected = null
}
}
// Do we need to elect a new master?
if (elected == null) {
// Find the member with the most updates.
val sortedMembers = members.filter(_._2.position >= 0).sortWith {
(a, b) => a._2.position > b._2.position
}
if (sortedMembers.size != members.size) {
info("Not enough cluster members have reported their update positions yet.")
} else {
// We now have an election.
elected = sortedMembers.head._1
}
}
// Sort by the positions in the cluster..
}
}
} else {
// Only the master sets the elected field.
elected = null
}
val master_elected = master.map(_.elected).getOrElse(null)
// If no master is currently elected, we need to report our current store position.
// Since that will be used to select the master.
val connect_target = if (master_elected != null) {
position = -1
members.get(store.brokerName).get.find(_._1 == master_elected).map(_._2.address).getOrElse(null)
} else {
// Once we are not running a master or server, report the position..
if( connected_address==null && address==null && !updating_store ) {
position = store.position
}
null
}
// Do we need to stop the running master?
if (master_elected != eid && address != null && !updating_store) {
info("Demoted to slave")
updating_store = true
store.stop_master {
elector.synchronized {
info("Master stopped")
address = null
changed
}
}
}
// Have we been promoted to being the master?
if (master_elected == eid && address==null && !updating_store ) {
info("Promoted to master")
updating_store = true
store.start_master { port =>
elector.synchronized {
updating_store = false
address = store.address(port)
info("Master started: "+address)
changed
}
}
}
// Can we become a slave?
if (master_elected != eid && address == null) {
// Did the master address change?
if (connect_target != connected_address) {
// Do we need to setup a new slave.
if (connect_target != null && !updating_store) {
updating_store = true
store.start_slave(connect_target) {
elector.synchronized {
updating_store=false
info("Slave started")
connected_address = connect_target
changed
}
}
}
// Lets stop the slave..
if (connect_target == null && !updating_store) {
updating_store = true
store.stop_slave {
elector.synchronized {
updating_store=false
info("Slave stopped")
connected_address = null
changed
}
}
}
}
}
update
}
}
}

View File

@ -28,7 +28,6 @@ import java.io.{IOException, File}
import java.net.{InetSocketAddress, URI}
import java.util.concurrent.atomic.AtomicLong
import scala.reflect.BeanProperty
import java.util.UUID
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@ -36,7 +35,7 @@ object MasterLevelDBStore extends Log
/**
*/
class MasterLevelDBStore extends LevelDBStore {
class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
import MasterLevelDBStore._
import collection.JavaConversions._
@ -45,24 +44,10 @@ class MasterLevelDBStore extends LevelDBStore {
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
var securityToken = ""
@BeanProperty
var minReplica = 1
val slaves = new ConcurrentHashMap[String,SlaveState]()
def replicaId:String = {
val replicaid_file = directory / "replicaid.txt"
if( replicaid_file.exists() ) {
replicaid_file.readText()
} else {
val rc = UUID.randomUUID().toString
replicaid_file.getParentFile.mkdirs()
replicaid_file.writeText(rc)
rc
}
}
override def doStart = {
super.doStart
start_protocol_server
@ -79,7 +64,6 @@ class MasterLevelDBStore extends LevelDBStore {
override def createClient = new MasterLevelDBClient(this)
def master_client = client.asInstanceOf[MasterLevelDBClient]
//////////////////////////////////////
// Replication Protocol Stuff
//////////////////////////////////////
@ -112,20 +96,6 @@ class MasterLevelDBStore extends LevelDBStore {
transport_server.stop(NOOP)
}
case class HawtCallback[T](cb:(T)=>Unit) extends Function1[T, Unit] {
val queue = getCurrentQueue
def apply(value:T) = {
if( queue==null || queue.isExecuting ) {
cb(value)
} else {
queue {
cb(value)
}
}
}
}
class Session(transport: Transport) extends TransportHandler(transport) {
var login:Login = _
@ -347,4 +317,6 @@ class MasterLevelDBStore extends LevelDBStore {
}
}
def wal_append_position = client.wal_append_position
}

View File

@ -0,0 +1,114 @@
package org.apache.activemq.leveldb.replicated
import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
import org.apache.activemq.command._
import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.store._
import org.apache.activemq.usage.SystemUsage
import java.io.File
import java.io.IOException
import java.util.Set
import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
/**
*/
abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore {
def proxy_target: LevelDBStore
def beginTransaction(context: ConnectionContext) {
proxy_target.beginTransaction(context)
}
def getLastProducerSequenceId(id: ProducerId): Long = {
return proxy_target.getLastProducerSequenceId(id)
}
def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore = {
return proxy_target.createTopicMessageStore(destination)
}
def setDirectory(dir: File) {
proxy_target.setDirectory(dir)
}
def checkpoint(sync: Boolean) {
proxy_target.checkpoint(sync)
}
def createTransactionStore: TransactionStore = {
return proxy_target.createTransactionStore
}
def setUsageManager(usageManager: SystemUsage) {
proxy_target.setUsageManager(usageManager)
}
def commitTransaction(context: ConnectionContext) {
proxy_target.commitTransaction(context)
}
def getLastMessageBrokerSequenceId: Long = {
return proxy_target.getLastMessageBrokerSequenceId
}
def setBrokerName(brokerName: String) {
proxy_target.setBrokerName(brokerName)
}
def rollbackTransaction(context: ConnectionContext) {
proxy_target.rollbackTransaction(context)
}
def removeTopicMessageStore(destination: ActiveMQTopic) {
proxy_target.removeTopicMessageStore(destination)
}
def getDirectory: File = {
return proxy_target.getDirectory
}
def size: Long = {
return proxy_target.size
}
def removeQueueMessageStore(destination: ActiveMQQueue) {
proxy_target.removeQueueMessageStore(destination)
}
def createQueueMessageStore(destination: ActiveMQQueue): MessageStore = {
return proxy_target.createQueueMessageStore(destination)
}
def deleteAllMessages {
proxy_target.deleteAllMessages
}
def getDestinations: Set[ActiveMQDestination] = {
return proxy_target.getDestinations
}
def rollback(txid: TransactionId) {
proxy_target.rollback(txid)
}
def recover(listener: TransactionRecoveryListener) {
proxy_target.recover(listener)
}
def prepare(txid: TransactionId) {
proxy_target.prepare(txid)
}
def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) {
proxy_target.commit(txid, wasPrepared, preCommit, postCommit)
}
def getPList(name: String): PList = {
return proxy_target.getPList(name)
}
def removePList(name: String): Boolean = {
return proxy_target.removePList(name)
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.leveldb.replicated
import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore}
import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.util.ServiceStopper
import java.util
import org.fusesource.hawtdispatch._
@ -29,13 +29,12 @@ import org.apache.activemq.leveldb.util._
import FileSupport._
import java.io.{IOException, RandomAccessFile, File}
import scala.reflect.BeanProperty
import java.util.UUID
object SlaveLevelDBStore extends Log
/**
*/
class SlaveLevelDBStore extends LevelDBStore {
class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
import SlaveLevelDBStore._
import ReplicationSupport._
@ -43,34 +42,14 @@ class SlaveLevelDBStore extends LevelDBStore {
@BeanProperty
var connect = "tcp://0.0.0.0:61619"
@BeanProperty
var securityToken = ""
val queue = createQueue("leveldb replication slave")
var replay_from = 0L
var caughtUp = false
override def createClient = new SlaveLevelDBClient(this)
def slave_client = client.asInstanceOf[SlaveLevelDBClient]
class SlaveLevelDBClient(val store:SlaveLevelDBStore) extends LevelDBClient(store) {
}
var wal_session:Session = _
var transfer_session:Session = _
def replicaId:String = {
val replicaid_file = directory / "replicaid.txt"
if( replicaid_file.exists() ) {
replicaid_file.readText()
} else {
val rc = UUID.randomUUID().toString
replicaid_file.getParentFile.mkdirs()
replicaid_file.writeText(rc)
rc
}
}
override def doStart() = {
client.init()

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.test;
import junit.framework.TestCase;
import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.leveldb.CountDownFuture;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
import org.apache.activemq.leveldb.util.FileSupport;
import org.apache.activemq.store.MessageStore;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
/**
*/
public class ElectingLevelDBStoreTest extends TestCase {
protected static final Logger LOG = LoggerFactory.getLogger(ElectingLevelDBStoreTest.class);
NIOServerCnxnFactory connector;
static File data_dir() {
return new File("target/activemq-data/leveldb-elections");
}
@Override
protected void setUp() throws Exception {
FileSupport.toRichFile(data_dir()).recursiveDelete();
System.out.println("Starting ZooKeeper");
ZooKeeperServer zk_server = new ZooKeeperServer();
zk_server.setTickTime(500);
zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(), "zk-log"), new File(data_dir(), "zk-data")));
connector = new NIOServerCnxnFactory();
connector.configure(new InetSocketAddress(0), 100);
connector.startup(zk_server);
System.out.println("ZooKeeper Started");
}
@Override
protected void tearDown() throws Exception {
if( connector!=null ) {
connector.shutdown();
connector = null;
}
}
public void testElection() throws Exception {
ArrayList<ElectingLevelDBStore> stores = new ArrayList<ElectingLevelDBStore>();
ArrayList<CountDownFuture> pending_starts = new ArrayList<CountDownFuture>();
for(String dir: new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"}) {
ElectingLevelDBStore store = createStoreNode();
store.setDirectory(new File(data_dir(), dir));
stores.add(store);
pending_starts.add(asyncStart(store));
}
// At least one of the stores should have started.
CountDownFuture f = waitFor(30 * 1000, pending_starts.toArray(new CountDownFuture[pending_starts.size()]));
assertTrue(f!=null);
pending_starts.remove(f);
// The other stores should not start..
LOG.info("Making sure the other stores don't start");
Thread.sleep(5000);
for(CountDownFuture start: pending_starts) {
assertFalse(start.completed());
}
// Make sure only of the stores is reporting to be the master.
ElectingLevelDBStore master = null;
for(ElectingLevelDBStore store: stores) {
if( store.isMaster() ) {
assertNull(master);
master = store;
}
}
assertNotNull(master);
// We can work out who the slaves are...
HashSet<ElectingLevelDBStore> slaves = new HashSet<ElectingLevelDBStore>(stores);
slaves.remove(master);
// Start sending messages to the master.
ArrayList<String> expected_list = new ArrayList<String>();
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
final int TOTAL = 500;
for (int i = 0; i < TOTAL; i++) {
if (i % ((int) (TOTAL * 0.10)) == 0) {
LOG.info("" + (100 * i / TOTAL) + "% done");
}
if( i == 250 ) {
LOG.info("Checking master state");
assertEquals(expected_list, getMessages(ms));
// mid way, lets kill the master..
LOG.info("Killing Master.");
master.stop();
// At least one of the remaining stores should complete starting.
LOG.info("Waiting for slave takeover...");
f = waitFor(60 * 1000, pending_starts.toArray(new CountDownFuture[pending_starts.size()]));
assertTrue(f!=null);
pending_starts.remove(f);
// Make sure one and only one of the slaves becomes the master..
master = null;
for(ElectingLevelDBStore store: slaves) {
if( store.isMaster() ) {
assertNull(master);
master = store;
}
}
assertNotNull(master);
slaves.remove(master);
ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
}
String msgid = "m:" + i;
addMessage(ms, msgid);
expected_list.add(msgid);
}
LOG.info("Checking master state");
assertEquals(expected_list, getMessages(ms));
master.stop();
for(ElectingLevelDBStore store: stores) {
store.stop();
}
}
private CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException {
long deadline = System.currentTimeMillis()+timeout;
while( true ) {
for (CountDownFuture f:futures) {
if( f.await(1, TimeUnit.MILLISECONDS) ) {
return f;
}
}
long remaining = deadline - System.currentTimeMillis();
if( remaining < 0 ) {
return null;
} else {
Thread.sleep(Math.min(remaining / 10, 100L));
}
}
}
private CountDownFuture asyncStart(final Service service) {
final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
public void run() {
try {
service.start();
f.set(null);
} catch (Throwable e) {
e.printStackTrace();
f.set(e);
}
}
});
return f;
}
private CountDownFuture asyncStop(final Service service) {
final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
public void run() {
try {
service.stop();
f.set(null);
} catch (Throwable e) {
e.printStackTrace();
f.set(e);
}
}
});
return f;
}
private ElectingLevelDBStore createStoreNode() {
ElectingLevelDBStore store = new ElectingLevelDBStore();
store.setSecurityToken("foo");
store.setLogSize(1023 * 200);
store.setMinReplica(1);
store.setZkAddress("localhost:" + connector.getLocalPort());
store.setZkPath("/broker-stores");
store.setBrokerName("foo");
store.setBind("tcp://0.0.0.0:0");
return store;
}
}

View File

@ -37,8 +37,8 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
/**
*/
@ -61,29 +61,29 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
// Updating the store should not complete since we don't have enough
// replicas.
CountDownFuture f = asyncAddMessage(ms, "m1");
assertFalse(f.completed().await(2, TimeUnit.SECONDS));
assertFalse(f.await(2, TimeUnit.SECONDS));
// Adding a slave should allow that update to complete.
SlaveLevelDBStore slave = createSlave(master, slaveDir);
slave.start();
assertTrue(f.completed().await(2, TimeUnit.SECONDS));
assertTrue(f.await(2, TimeUnit.SECONDS));
// New updates should complete quickly now..
f = asyncAddMessage(ms, "m2");
assertTrue(f.completed().await(1, TimeUnit.SECONDS));
assertTrue(f.await(1, TimeUnit.SECONDS));
// If the slave goes offline, then updates should once again
// not complete.
slave.stop();
f = asyncAddMessage(ms, "m3");
assertFalse(f.completed().await(2, TimeUnit.SECONDS));
assertFalse(f.await(2, TimeUnit.SECONDS));
// Restart and the op should complete.
slave = createSlave(master, slaveDir);
slave.start();
assertTrue(f.completed().await(2, TimeUnit.SECONDS));
assertTrue(f.await(2, TimeUnit.SECONDS));
master.stop();
slave.stop();
@ -91,15 +91,14 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
}
private CountDownFuture asyncAddMessage(final MessageStore ms, final String body) {
final CountDownFuture f = new CountDownFuture(new CountDownLatch(1));
final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
public void run() {
try {
addMessage(ms, body);
} catch (Exception e) {
e.printStackTrace();
} finally {
f.countDown();
f.set(null);
} catch (Throwable e) {
f.set(e);
}
}
});
@ -114,13 +113,13 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
directories.add(new File("target/activemq-data/leveldb-node2"));
directories.add(new File("target/activemq-data/leveldb-node3"));
for( File f: directories) {
for (File f : directories) {
FileSupport.toRichFile(f).recursiveDelete();
}
ArrayList<String> expected_list = new ArrayList<String>();
// We will rotate between 3 nodes the task of being the master.
for( int j=0; j < 10; j++) {
for (int j = 0; j < 10; j++) {
MasterLevelDBStore master = createMaster(directories.get(0));
master.start();
@ -132,11 +131,11 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
final int TOTAL = 500;
for (int i = 0; i < TOTAL; i++) {
if ( i % ((int) (TOTAL * 0.10)) == 0) {
LOG.info("" + (100*i/TOTAL) + "% done");
if (i % ((int) (TOTAL * 0.10)) == 0) {
LOG.info("" + (100 * i / TOTAL) + "% done");
}
if( i == 250 ) {
if (i == 250) {
slave1.start();
slave2.stop();
}
@ -149,9 +148,9 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
LOG.info("Checking master state");
assertEquals(expected_list, getMessages(ms));
LOG.info("Stopping master: "+master.replicaId());
LOG.info("Stopping master: " + master.replicaId());
master.stop();
LOG.info("Stopping slave: "+slave1.replicaId());
LOG.info("Stopping slave: " + slave1.replicaId());
slave1.stop();
// Rotate the dir order so that slave1 becomes the master next.
@ -164,7 +163,7 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
slave1.setDirectory(directory);
slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
slave1.setSecurityToken("foo");
slave1.setLogSize(1023*200);
slave1.setLogSize(1023 * 200);
return slave1;
}
@ -178,49 +177,5 @@ public class ReplicatedLevelDBStoreTest extends TestCase {
return master;
}
long id_counter = 0L;
String payload = "";
{
for (int i = 0; i < 1024; i++) {
payload += "x";
}
}
public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setPersistent(true);
message.setResponseRequired(true);
message.setStringProperty("id", body);
message.setText(payload);
id_counter += 1;
MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);
messageId.setBrokerSequenceId(id_counter);
message.setMessageId(messageId);
ms.addMessage(new ConnectionContext(), message);
return message;
}
public ArrayList<String> getMessages(MessageStore ms) throws Exception {
final ArrayList<String> rc = new ArrayList<String>();
ms.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception {
rc.add(((ActiveMQTextMessage) message).getStringProperty("id"));
return true;
}
public boolean hasSpace() {
return true;
}
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
public boolean isDuplicate(MessageId ref) {
return false;
}
});
return rc;
}
}

View File

@ -0,0 +1,62 @@
package org.apache.activemq.leveldb.test;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
/**
*/
public class ReplicationTestSupport {
static long id_counter = 0L;
static String payload = "";
{
for (int i = 0; i < 1024; i++) {
payload += "x";
}
}
static public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setPersistent(true);
message.setResponseRequired(true);
message.setStringProperty("id", body);
message.setText(payload);
id_counter += 1;
MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);
messageId.setBrokerSequenceId(id_counter);
message.setMessageId(messageId);
ms.addMessage(new ConnectionContext(), message);
return message;
}
static public ArrayList<String> getMessages(MessageStore ms) throws Exception {
final ArrayList<String> rc = new ArrayList<String>();
ms.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception {
rc.add(((ActiveMQTextMessage) message).getStringProperty("id"));
return true;
}
public boolean hasSpace() {
return true;
}
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
public boolean isDuplicate(MessageId ref) {
return false;
}
});
return rc;
}
}