Fixing bug which caused replicated leveldb nodes to not recover from a ZooKeeper failure.

This commit is contained in:
Hiram Chirino 2013-10-09 10:56:20 -04:00
parent 9ee65d3218
commit 5e63ddd337
7 changed files with 160 additions and 164 deletions

View File

@ -18,8 +18,8 @@ package org.apache.activemq.leveldb.replicated
import org.linkedin.util.clock.Timespan
import scala.reflect.BeanProperty
import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore}
import org.apache.activemq.util.ServiceStopper
import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
import java.net.{NetworkInterface, InetAddress}
import org.fusesource.hawtdispatch._
import org.apache.activemq.broker.{LockableServiceSupport, Locker}
@ -30,11 +30,9 @@ import org.apache.activemq.leveldb.util.Log
import java.io.File
import org.apache.activemq.usage.SystemUsage
import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
import org.apache.activemq.broker.jmx.{OpenTypeSupport, BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.leveldb.LevelDBStore._
import javax.management.ObjectName
import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeData}
import java.util
import org.apache.activemq.leveldb.replicated.groups._
@ -138,7 +136,7 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
var slave: SlaveLevelDBStore = _
var zk_client: ZKClient = _
var zk_group: Group = _
var zk_group: ZooKeeperGroup = _
var position: Long = -1L
@ -270,6 +268,22 @@ class ElectingLevelDBStore extends ProxyLevelDBStore {
zk_group.close
zk_client.close()
zk_client = null
if( master!=null ) {
val latch = new CountDownLatch(1)
stop_master {
latch.countDown()
}
latch.await()
}
if( slave !=null ) {
val latch = new CountDownLatch(1)
stop_slave {
latch.countDown()
}
latch.await()
}
if( master_started.get() ) {
stopped_latch.countDown()
}

View File

@ -3,6 +3,7 @@ package org.apache.activemq.leveldb.replicated
import org.apache.activemq.leveldb.replicated.groups._
import org.codehaus.jackson.annotate.JsonProperty
import org.apache.activemq.leveldb.util.{Log, JsonCodec}
import java.io.IOException
class LevelDBNodeState extends NodeState {
@ -67,7 +68,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
var next = create_state
if (next != last_state) {
last_state = next
update(next)
join(next)
}
}
@ -89,6 +90,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
changed
}
var stopped = false;
def changed:Unit = elector.synchronized {
debug("ZooKeeper group changed: %s", members)
@ -139,7 +141,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
elected = null
}
val master_elected = master.map(_.elected).getOrElse(null)
val master_elected = if(eid==null) null else master.map(_.elected).getOrElse(null)
// If no master is currently elected, we need to report our current store position.
// Since that will be used to select the master.
@ -155,7 +157,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
}
// Do we need to stop the running master?
if (master_elected != eid && address != null && !updating_store) {
if ((eid==null || master_elected != eid) && address!=null && !updating_store) {
info("Demoted to slave")
updating_store = true
store.stop_master {
@ -169,7 +171,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
}
// Have we been promoted to being the master?
if (master_elected == eid && address==null && !updating_store ) {
if (eid!=null && master_elected == eid && address==null && !updating_store ) {
info("Promoted to master")
updating_store = true
store.start_master { port =>
@ -183,7 +185,7 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
}
// Can we become a slave?
if (master_elected != eid && address == null) {
if ( (eid==null || master_elected != eid) && address == null) {
// Did the master address change?
if (connect_target != connected_address) {
@ -214,8 +216,9 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve
}
}
}
update
if( group.zk.isConnected ) {
update
}
}
}
}

View File

@ -14,13 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.replicated.groups.internal
package org.apache.activemq.leveldb.replicated.groups
import org.apache.activemq.leveldb.replicated.groups.ChangeListener
import org.slf4j.{Logger, LoggerFactory}
import java.util.concurrent.TimeUnit
/**
* <p>
* Callback interface used to get notifications of changes
* to a cluster group.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait ChangeListener {
def changed:Unit
def connected:Unit
def disconnected:Unit
}
object ChangeListenerSupport {
val LOG: Logger = LoggerFactory.getLogger(classOf[ChangeListenerSupport])
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.leveldb.replicated.groups
import collection.mutable.{ListBuffer, HashMap}
import internal.ChangeListenerSupport
import java.io._
import org.codehaus.jackson.map.ObjectMapper
@ -83,7 +82,7 @@ object ClusteredSupport {
class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends ChangeListenerSupport {
import ClusteredSupport._
protected var _group:Group = _
protected var _group:ZooKeeperGroup = _
def group = _group
/**
@ -124,7 +123,7 @@ class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends
protected def onConnected = {}
protected def onDisconnected = {}
def start(group:Group) = this.synchronized {
def start(group:ZooKeeperGroup) = this.synchronized {
if(_group !=null )
throw new IllegalStateException("Already started.")
_group = group
@ -186,7 +185,7 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends Clustered
override def stop = {
this.synchronized {
if(_eid != null) {
if(_state != null) {
leave
}
super.stop
@ -200,10 +199,22 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends Clustered
throw new IllegalArgumentException("The state id cannot be null")
if(_group==null)
throw new IllegalStateException("Not started.")
if(this._state!=null)
throw new IllegalStateException("Already joined")
this._state = state
_eid = group.join(encode(state, mapper))
while( connected ) {
if( _eid == null ) {
_eid = group.join(encode(state, mapper))
return;
} else {
try {
_group.update(_eid, encode(state, mapper))
return;
} catch {
case e:NoNodeException =>
this._eid = null;
}
}
}
}
def leave:Unit = this.synchronized {
@ -211,41 +222,19 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends Clustered
throw new IllegalStateException("Not joined")
if(_group==null)
throw new IllegalStateException("Not started.")
_group.leave(_eid)
_eid = null
this._state = null.asInstanceOf[T]
}
def update(state:T) = this.synchronized {
if(this._state==null)
throw new IllegalStateException("Not joined")
if(state==null)
throw new IllegalArgumentException("State cannot be null")
if(state.id==null)
throw new IllegalArgumentException("The state id cannot be null")
if(state.id!=this._state.id)
throw new IllegalArgumentException("The state id cannot change")
if(_group==null)
throw new IllegalStateException("Not started.")
this._state = state
try {
_group.update(_eid, encode(state, mapper))
} catch {
case e:NoNodeException =>
this._state = null.asInstanceOf[T]
join(state)
if( _eid!=null && connected ) {
_group.leave(_eid)
_eid = null
}
}
override protected def onDisconnected {
this._eid = null
}
override protected def onConnected {
if( this.eid==null && this._state!=null ) {
this._state = null.asInstanceOf[T]
if( this._state!=null ) {
join(this._state)
}
}

View File

@ -1,109 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.replicated.groups
import internal.ZooKeeperGroup
import org.apache.zookeeper.data.ACL
import org.apache.zookeeper.ZooDefs.Ids
import java.util.LinkedHashMap
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object ZooKeeperGroupFactory {
def create(zk: ZKClient, path: String):Group = new ZooKeeperGroup(zk, path)
def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = ZooKeeperGroup.members(zk, path)
}
/**
* <p>
* Used the join a cluster group and to monitor the memberships
* of that group.
* </p>
* <p>
* This object is not thread safe. You should are responsible for
* synchronizing access to it across threads.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait Group {
/**
* Adds a member to the group with some associated data.
*/
def join(data:Array[Byte]):String
/**
* Updates the data associated with joined member.
*/
def update(id:String, data:Array[Byte]):Unit
/**
* Removes a previously added member.
*/
def leave(id:String):Unit
/**
* Lists all the members currently in the group.
*/
def members:java.util.LinkedHashMap[String, Array[Byte]]
/**
* Registers a change listener which will be called
* when the cluster membership changes.
*/
def add(listener:ChangeListener)
/**
* Removes a previously added change listener.
*/
def remove(listener:ChangeListener)
/**
* A group should be closed to release aquired resources used
* to monitor the group membership.
*
* Whe the Group is closed, any memberships registered via this
* Group will be removed from the group.
*/
def close:Unit
/**
* Are we connected with the cluster?
*/
def connected:Boolean
}
/**
* <p>
* Callback interface used to get notifications of changes
* to a cluster group.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait ChangeListener {
def changed:Unit
def connected:Unit
def disconnected:Unit
}

View File

@ -14,17 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb.replicated.groups.internal
package org.apache.activemq.leveldb.replicated.groups
import org.apache.zookeeper._
import java.lang.String
import org.linkedin.zookeeper.tracker._
import org.apache.activemq.leveldb.replicated.groups.{ZKClient, ChangeListener, Group}
import scala.collection.mutable.HashMap
import org.linkedin.zookeeper.client.LifecycleListener
import collection.JavaConversions._
import java.util.{LinkedHashMap, Collection}
import org.apache.zookeeper.KeeperException.{ConnectionLossException, NoNodeException, Code}
import org.apache.zookeeper.KeeperException.{ConnectionLossException, NoNodeException}
import scala.Predef._
import scala.Some
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object ZooKeeperGroupFactory {
def create(zk: ZKClient, path: String):ZooKeeperGroup = new ZooKeeperGroup(zk, path)
def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = ZooKeeperGroup.members(zk, path)
}
/**
*
@ -56,7 +70,7 @@ object ZooKeeperGroup {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with LifecycleListener with ChangeListenerSupport {
class ZooKeeperGroup(val zk: ZKClient, val root: String) extends LifecycleListener with ChangeListenerSupport {
val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
val joins = HashMap[String, Int]()
@ -83,7 +97,9 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with Life
closed = true
joins.foreach { case (path, version) =>
try {
zk.delete(member_path_prefix + path, version)
if( zk.isConnected ) {
zk.delete(member_path_prefix + path, version)
}
} catch {
case x:NoNodeException => // Already deleted.
}

View File

@ -167,6 +167,76 @@ public class ElectingLevelDBStoreTest {
}
}
@Test(timeout = 1000 * 60 * 60)
public void testZooKeeperServerFailure() throws Exception {
final ArrayList<ElectingLevelDBStore> stores = new ArrayList<ElectingLevelDBStore>();
ArrayList<CountDownFuture> pending_starts = new ArrayList<CountDownFuture>();
for (String dir : new String[]{"leveldb-node1", "leveldb-node2", "leveldb-node3"}) {
ElectingLevelDBStore store = createStoreNode();
store.setDirectory(new File(data_dir(), dir));
stores.add(store);
pending_starts.add(asyncStart(store));
}
// At least one of the stores should have started.
CountDownFuture f = waitFor(30 * 1000, pending_starts.toArray(new CountDownFuture[pending_starts.size()]));
assertTrue(f != null);
pending_starts.remove(f);
// The other stores should not start..
LOG.info("Making sure the other stores don't start");
Thread.sleep(5000);
for (CountDownFuture start : pending_starts) {
assertFalse(start.completed());
}
// Stop ZooKeeper..
LOG.info("SHUTTING DOWN ZooKeeper!");
connector.shutdown();
// None of the store should be slaves...
within( 30, TimeUnit.SECONDS, new Task(){
public void run() throws Exception {
for (ElectingLevelDBStore store : stores) {
assertFalse(store.isMaster());
}
}
});
for (ElectingLevelDBStore store : stores) {
store.stop();
}
}
static interface Task {
public void run() throws Exception;
}
private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
long timeMS = unit.toMillis(time);
long deadline = System.currentTimeMillis() + timeMS;
while (true) {
try {
task.run();
return;
} catch (Throwable e) {
long remaining = deadline - System.currentTimeMillis();
if( remaining <=0 ) {
if( e instanceof RuntimeException ) {
throw (RuntimeException)e;
}
if( e instanceof Error ) {
throw (Error)e;
}
throw new RuntimeException(e);
}
Thread.sleep(Math.min(timeMS/10, remaining));
}
}
}
private CountDownFuture waitFor(int timeout, CountDownFuture... futures) throws InterruptedException {
long deadline = System.currentTimeMillis()+timeout;
while( true ) {