HBASE-15234 Don't abort ReplicationLogCleaner on ZooKeeper errors

This commit is contained in:
Gary Helmling 2016-02-16 14:19:19 -08:00
parent c8190d7e2e
commit d7d12aedd4
2 changed files with 102 additions and 15 deletions

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.master;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -47,12 +48,11 @@ import org.apache.zookeeper.KeeperException;
* replication before deleting it when its TTL is over.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues;
private boolean stopped = false;
private boolean aborted;
@Override
@ -136,15 +136,23 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
// Make my own Configuration. Then I'll have my own connection to zk that
// I can close myself when comes time.
Configuration conf = new Configuration(config);
try {
setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}
@VisibleForTesting
public void setConf(Configuration conf, ZooKeeperWatcher zk) {
super.setConf(conf);
try {
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
this.zkw = zk;
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf,
new WarnOnlyAbortable());
this.replicationQueues.init();
} catch (ReplicationException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}
@ -163,15 +171,19 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
return this.stopped;
}
@Override
public void abort(String why, Throwable e) {
LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
this.aborted = true;
stop(why);
}
private static class WarnOnlyAbortable implements Abortable {
@Override
public boolean isAborted() {
return this.aborted;
@Override
public void abort(String why, Throwable e) {
LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why);
if (LOG.isDebugEnabled()) {
LOG.debug(e);
}
}
@Override
public boolean isAborted() {
return false;
}
}
}

View File

@ -18,16 +18,24 @@
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -36,6 +44,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@ -43,7 +52,10 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -165,6 +177,50 @@ public class TestLogsCleaner {
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
}
/**
* ReplicationLogCleaner should be able to ride over ZooKeeper errors without
* aborting.
*/
@Test
public void testZooKeeperAbort() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> dummyFiles = Lists.newArrayList(
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
);
FaultyZooKeeperWatcher faultyZK =
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
try {
faultyZK.init();
cleaner.setConf(conf, faultyZK);
// should keep all files due to a ConnectionLossException getting the queues znodes
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped());
} finally {
faultyZK.close();
}
// when zk is working both files should be returned
cleaner = new ReplicationLogCleaner();
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
try {
cleaner.setConf(conf, zkw);
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator();
assertTrue(iter.hasNext());
assertEquals(new Path("log1"), iter.next().getPath());
assertTrue(iter.hasNext());
assertEquals(new Path("log2"), iter.next().getPath());
assertFalse(iter.hasNext());
} finally {
zkw.close();
}
}
static class DummyServer implements Server {
@Override
@ -223,4 +279,23 @@ public class TestLogsCleaner {
return null;
}
}
static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
private RecoverableZooKeeper zk;
public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
throws ZooKeeperConnectionException, IOException {
super(conf, identifier, abortable);
}
public void init() throws Exception {
this.zk = spy(super.getRecoverableZooKeeper());
doThrow(new KeeperException.ConnectionLossException())
.when(zk).getData("/hbase/replication/rs", null, new Stat());
}
public RecoverableZooKeeper getRecoverableZooKeeper() {
return zk;
}
}
}