diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index a6b6dd8dd6e..9ecba11c3ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.master; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -47,12 +48,11 @@ import org.apache.zookeeper.KeeperException; * replication before deleting it when its TTL is over. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable { +public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private ZooKeeperWatcher zkw; private ReplicationQueuesClient replicationQueues; private boolean stopped = false; - private boolean aborted; @Override @@ -136,15 +136,23 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo // Make my own Configuration. Then I'll have my own connection to zk that // I can close myself when comes time. Configuration conf = new Configuration(config); + try { + setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null)); + } catch (IOException e) { + LOG.error("Error while configuring " + this.getClass().getName(), e); + } + } + + @VisibleForTesting + public void setConf(Configuration conf, ZooKeeperWatcher zk) { super.setConf(conf); try { - this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); - this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this); + this.zkw = zk; + this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, + new WarnOnlyAbortable()); this.replicationQueues.init(); } catch (ReplicationException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); - } catch (IOException e) { - LOG.error("Error while configuring " + this.getClass().getName(), e); } } @@ -163,15 +171,19 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo return this.stopped; } - @Override - public void abort(String why, Throwable e) { - LOG.warn("Aborting ReplicationLogCleaner because " + why, e); - this.aborted = true; - stop(why); - } + private static class WarnOnlyAbortable implements Abortable { - @Override - public boolean isAborted() { - return this.aborted; + @Override + public void abort(String why, Throwable e) { + LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why); + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + + @Override + public boolean isAborted() { + return false; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index ebf3699a212..47db32b0c16 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -18,16 +18,24 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.lang.reflect.Field; import java.net.URLEncoder; +import java.util.Iterator; import java.util.LinkedList; +import java.util.List; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -35,6 +43,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -44,7 +53,10 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -166,6 +178,50 @@ public class TestLogsCleaner { cleaner.getDeletableFiles(new LinkedList()); } + /** + * ReplicationLogCleaner should be able to ride over ZooKeeper errors without + * aborting. + */ + @Test + public void testZooKeeperAbort() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); + + List dummyFiles = Lists.newArrayList( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + ); + + FaultyZooKeeperWatcher faultyZK = + new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); + try { + faultyZK.init(); + cleaner.setConf(conf, faultyZK); + // should keep all files due to a ConnectionLossException getting the queues znodes + Iterable toDelete = cleaner.getDeletableFiles(dummyFiles); + assertFalse(toDelete.iterator().hasNext()); + assertFalse(cleaner.isStopped()); + } finally { + faultyZK.close(); + } + + // when zk is working both files should be returned + cleaner = new ReplicationLogCleaner(); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null); + try { + cleaner.setConf(conf, zkw); + Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); + Iterator iter = filesToDelete.iterator(); + assertTrue(iter.hasNext()); + assertEquals(new Path("log1"), iter.next().getPath()); + assertTrue(iter.hasNext()); + assertEquals(new Path("log2"), iter.next().getPath()); + assertFalse(iter.hasNext()); + } finally { + zkw.close(); + } + } + static class DummyServer implements Server { @Override @@ -230,4 +286,23 @@ public class TestLogsCleaner { return null; } } + + static class FaultyZooKeeperWatcher extends ZooKeeperWatcher { + private RecoverableZooKeeper zk; + + public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) + throws ZooKeeperConnectionException, IOException { + super(conf, identifier, abortable); + } + + public void init() throws Exception { + this.zk = spy(super.getRecoverableZooKeeper()); + doThrow(new KeeperException.ConnectionLossException()) + .when(zk).getData("/hbase/replication/rs", null, new Stat()); + } + + public RecoverableZooKeeper getRecoverableZooKeeper() { + return zk; + } + } } \ No newline at end of file