leveldb replication Master was failing to give up being master after it's process is suspended by using ctrl-z.

This commit is contained in:
Hiram Chirino 2013-09-03 10:38:30 -04:00
parent 8d4fef8af2
commit 1eca031356
6 changed files with 66 additions and 14 deletions

View File

@ -511,12 +511,20 @@ class LevelDBClient(store: LevelDBStore) {
def might_fail[T](func : =>T):T = {
def handleFailure(e:IOException) = {
var failure:Throwable = e;
if( store.broker_service !=null ) {
// This should start stopping the broker but it might block,
// so do it on another thread...
new Thread("LevelDB IOException handler.") {
override def run() {
store.broker_service.handleIOException(e);
try {
store.broker_service.handleIOException(e)
} catch {
case e:RuntimeException =>
failure = e
} finally {
store.stop()
}
}
}.start()
// Lets wait until the broker service has started stopping. Once the
@ -526,8 +534,7 @@ class LevelDBClient(store: LevelDBStore) {
Thread.sleep(100);
}
}
store.stop()
throw e;
throw failure;
}
try {
func

View File

@ -85,7 +85,9 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
object change_listener extends ChangeListener {
def connected = changed
def disconnected = changed
def disconnected = {
changed
}
def changed:Unit = elector.synchronized {
// info(eid+" cluster state changed: "+members)

View File

@ -357,7 +357,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
return
}
if( isStopped ) {
if( isStoppedOrStopping ) {
throw new IllegalStateException("Store replication stopped")
}
@ -368,13 +368,23 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
while( !position_sync.await(1, TimeUnit.SECONDS) ) {
if( isStopped ) {
if( isStoppedOrStopping ) {
throw new IllegalStateException("Store replication stopped")
}
warn("Store update waiting on %d replica(s) to catch up to log position %d. %s", minSlaveAcks, position, status)
}
}
def isStoppedOrStopping: Boolean = {
if( isStopped || isStopping )
return true
if( broker_service!=null && broker_service.isStopping )
return true
false
}
def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
if( length > 0 ) {
val value = new LogWrite

View File

@ -27,6 +27,7 @@ import java.util.LinkedHashMap
import java.lang.{IllegalStateException, String}
import reflect.BeanProperty
import org.codehaus.jackson.annotate.JsonProperty
import org.apache.zookeeper.KeeperException.NoNodeException
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -108,16 +109,20 @@ class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends
}
def connected = {
onConnected
changed
ClusteredSingletonWatcher.this.fireConnected
}
def disconnected = {
onDisconnected
changed
ClusteredSingletonWatcher.this.fireDisconnected
}
}
protected def onConnected = {}
protected def onDisconnected = {}
def start(group:Group) = this.synchronized {
if(_group !=null )
@ -223,8 +228,26 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends Clustered
if(_group==null)
throw new IllegalStateException("Not started.")
this._state = state
_group.update(_eid, encode(state, mapper))
try {
_group.update(_eid, encode(state, mapper))
} catch {
case e:NoNodeException =>
this._state = null.asInstanceOf[T]
join(state)
}
}
override protected def onDisconnected {
this._eid = null
}
override protected def onConnected {
if( this.eid==null && this._state!=null ) {
this._state = null.asInstanceOf[T]
join(this._state)
}
}
def isMaster:Boolean = this.synchronized {

View File

@ -51,7 +51,7 @@ trait ChangeListenerSupport {
}
def fireConnected() = {
val listener = this.synchronized { this.listeners }
val listeners = this.synchronized { this.listeners }
check_elapsed_time {
for (listener <- listeners) {
listener.connected
@ -60,7 +60,7 @@ trait ChangeListenerSupport {
}
def fireDisconnected() = {
val listener = this.synchronized { this.listeners }
val listeners = this.synchronized { this.listeners }
check_elapsed_time {
for (listener <- listeners) {
listener.disconnected
@ -69,7 +69,7 @@ trait ChangeListenerSupport {
}
def fireChanged() = {
val listener = this.synchronized { this.listeners }
val listeners = this.synchronized { this.listeners }
val start = System.nanoTime()
check_elapsed_time {
for (listener <- listeners) {

View File

@ -91,7 +91,10 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with Life
def connected = zk.isConnected
def onConnected() = fireConnected()
def onDisconnected() = fireDisconnected()
def onDisconnected() = {
this.members = new LinkedHashMap()
fireDisconnected()
}
def join(data:Array[Byte]=null): String = this.synchronized {
val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix)
@ -102,9 +105,16 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with Life
def update(path:String, data:Array[Byte]=null): Unit = this.synchronized {
joins.get(path) match {
case Some(ver) =>
val stat = zk.setData(member_path_prefix+path, data, ver)
joins.put(path, stat.getVersion)
case None => throw new IllegalArgumentException("Has not joined locally: "+path)
try {
val stat = zk.setData(member_path_prefix + path, data, ver)
joins.put(path, stat.getVersion)
}
catch {
case e:NoNodeException =>
joins.remove(path)
throw e;
}
case None => throw new NoNodeException("Has not joined locally: "+path)
}
}