HBASE-15234 Don't abort ReplicationLogCleaner on ZooKeeper errors
This commit is contained in:
parent
e1d5c3d269
commit
2d8e0a0477
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication.master;
|
package org.apache.hadoop.hbase.replication.master;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -47,12 +48,11 @@ import org.apache.zookeeper.KeeperException;
|
||||||
* replication before deleting it when its TTL is over.
|
* replication before deleting it when its TTL is over.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
|
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
|
||||||
private ZooKeeperWatcher zkw;
|
private ZooKeeperWatcher zkw;
|
||||||
private ReplicationQueuesClient replicationQueues;
|
private ReplicationQueuesClient replicationQueues;
|
||||||
private boolean stopped = false;
|
private boolean stopped = false;
|
||||||
private boolean aborted;
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -136,15 +136,23 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
|
||||||
// Make my own Configuration. Then I'll have my own connection to zk that
|
// Make my own Configuration. Then I'll have my own connection to zk that
|
||||||
// I can close myself when comes time.
|
// I can close myself when comes time.
|
||||||
Configuration conf = new Configuration(config);
|
Configuration conf = new Configuration(config);
|
||||||
|
try {
|
||||||
|
setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setConf(Configuration conf, ZooKeeperWatcher zk) {
|
||||||
super.setConf(conf);
|
super.setConf(conf);
|
||||||
try {
|
try {
|
||||||
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
|
this.zkw = zk;
|
||||||
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
|
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf,
|
||||||
|
new WarnOnlyAbortable());
|
||||||
this.replicationQueues.init();
|
this.replicationQueues.init();
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,15 +171,19 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
|
||||||
return this.stopped;
|
return this.stopped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class WarnOnlyAbortable implements Abortable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void abort(String why, Throwable e) {
|
public void abort(String why, Throwable e) {
|
||||||
LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
|
LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why);
|
||||||
this.aborted = true;
|
if (LOG.isDebugEnabled()) {
|
||||||
stop(why);
|
LOG.debug(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isAborted() {
|
public boolean isAborted() {
|
||||||
return this.aborted;
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,16 +18,24 @@
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.ChoreService;
|
import org.apache.hadoop.hbase.ChoreService;
|
||||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
@ -35,6 +43,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
|
@ -44,7 +53,10 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -166,6 +178,50 @@ public class TestLogsCleaner {
|
||||||
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
|
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ReplicationLogCleaner should be able to ride over ZooKeeper errors without
|
||||||
|
* aborting.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testZooKeeperAbort() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
|
||||||
|
|
||||||
|
List<FileStatus> dummyFiles = Lists.newArrayList(
|
||||||
|
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
|
||||||
|
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
|
||||||
|
);
|
||||||
|
|
||||||
|
FaultyZooKeeperWatcher faultyZK =
|
||||||
|
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
|
||||||
|
try {
|
||||||
|
faultyZK.init();
|
||||||
|
cleaner.setConf(conf, faultyZK);
|
||||||
|
// should keep all files due to a ConnectionLossException getting the queues znodes
|
||||||
|
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
|
||||||
|
assertFalse(toDelete.iterator().hasNext());
|
||||||
|
assertFalse(cleaner.isStopped());
|
||||||
|
} finally {
|
||||||
|
faultyZK.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// when zk is working both files should be returned
|
||||||
|
cleaner = new ReplicationLogCleaner();
|
||||||
|
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
|
||||||
|
try {
|
||||||
|
cleaner.setConf(conf, zkw);
|
||||||
|
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
|
||||||
|
Iterator<FileStatus> iter = filesToDelete.iterator();
|
||||||
|
assertTrue(iter.hasNext());
|
||||||
|
assertEquals(new Path("log1"), iter.next().getPath());
|
||||||
|
assertTrue(iter.hasNext());
|
||||||
|
assertEquals(new Path("log2"), iter.next().getPath());
|
||||||
|
assertFalse(iter.hasNext());
|
||||||
|
} finally {
|
||||||
|
zkw.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class DummyServer implements Server {
|
static class DummyServer implements Server {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -230,4 +286,23 @@ public class TestLogsCleaner {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
|
||||||
|
private RecoverableZooKeeper zk;
|
||||||
|
|
||||||
|
public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
|
||||||
|
throws ZooKeeperConnectionException, IOException {
|
||||||
|
super(conf, identifier, abortable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void init() throws Exception {
|
||||||
|
this.zk = spy(super.getRecoverableZooKeeper());
|
||||||
|
doThrow(new KeeperException.ConnectionLossException())
|
||||||
|
.when(zk).getData("/hbase/replication/rs", null, new Stat());
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecoverableZooKeeper getRecoverableZooKeeper() {
|
||||||
|
return zk;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue