HBASE-15578 Handle HBASE-15234 for ReplicationHFileCleaner
This commit is contained in:
parent
79868bd394
commit
33396c3629
|
@ -10,6 +10,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.master;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -41,12 +42,11 @@ import org.apache.zookeeper.KeeperException;
|
|||
* deleting it from hfile archive directory.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
|
||||
public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
|
||||
private ZooKeeperWatcher zkw;
|
||||
private ReplicationQueuesClient rqc;
|
||||
private boolean stopped = false;
|
||||
private boolean aborted;
|
||||
|
||||
@Override
|
||||
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
|
||||
|
@ -129,18 +129,27 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements
|
|||
// Make my own Configuration. Then I'll have my own connection to zk that
|
||||
// I can close myself when time comes.
|
||||
Configuration conf = new Configuration(config);
|
||||
super.setConf(conf);
|
||||
try {
|
||||
initReplicationQueuesClient(conf);
|
||||
setConf(conf, new ZooKeeperWatcher(conf, "replicationHFileCleaner", null));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void initReplicationQueuesClient(Configuration conf)
|
||||
@VisibleForTesting
|
||||
public void setConf(Configuration conf, ZooKeeperWatcher zk) {
|
||||
super.setConf(conf);
|
||||
try {
|
||||
initReplicationQueuesClient(conf, zk);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void initReplicationQueuesClient(Configuration conf, ZooKeeperWatcher zk)
|
||||
throws ZooKeeperConnectionException, IOException {
|
||||
this.zkw = new ZooKeeperWatcher(conf, "replicationHFileCleaner", null);
|
||||
this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
|
||||
this.zkw = zk;
|
||||
this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new WarnOnlyAbortable());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -160,18 +169,6 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements
|
|||
return this.stopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
|
||||
this.aborted = true;
|
||||
stop(why);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return this.aborted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFileDeletable(FileStatus fStat) {
|
||||
Set<String> hfileRefsFromQueue;
|
||||
|
@ -190,4 +187,19 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements
|
|||
}
|
||||
return !hfileRefsFromQueue.contains(fStat.getPath().getName());
|
||||
}
|
||||
|
||||
private static class WarnOnlyAbortable implements Abortable {
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.warn("ReplicationHFileCleaner received abort, ignoring. Reason: " + why);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,9 +10,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
|
@ -26,12 +31,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
|
@ -45,7 +52,10 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
|||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -201,6 +211,48 @@ public class TestReplicationHFileCleaner {
|
|||
cleaner.isFileDeletable(fs.getFileStatus(file));
|
||||
}
|
||||
|
||||
/**
|
||||
* ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting.
|
||||
*/
|
||||
@Test
|
||||
public void testZooKeeperAbort() throws Exception {
|
||||
ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
|
||||
|
||||
List<FileStatus> dummyFiles =
|
||||
Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
|
||||
"hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
|
||||
"hfile2")));
|
||||
|
||||
FaultyZooKeeperWatcher faultyZK =
|
||||
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
|
||||
try {
|
||||
faultyZK.init();
|
||||
cleaner.setConf(conf, faultyZK);
|
||||
// should keep all files due to a ConnectionLossException getting the queues znodes
|
||||
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
|
||||
assertFalse(toDelete.iterator().hasNext());
|
||||
assertFalse(cleaner.isStopped());
|
||||
} finally {
|
||||
faultyZK.close();
|
||||
}
|
||||
|
||||
// when zk is working both files should be returned
|
||||
cleaner = new ReplicationHFileCleaner();
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
|
||||
try {
|
||||
cleaner.setConf(conf, zkw);
|
||||
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
|
||||
Iterator<FileStatus> iter = filesToDelete.iterator();
|
||||
assertTrue(iter.hasNext());
|
||||
assertEquals(new Path("hfile1"), iter.next().getPath());
|
||||
assertTrue(iter.hasNext());
|
||||
assertEquals(new Path("hfile2"), iter.next().getPath());
|
||||
assertFalse(iter.hasNext());
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
static class DummyServer implements Server {
|
||||
|
||||
@Override
|
||||
|
@ -267,4 +319,22 @@ public class TestReplicationHFileCleaner {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
|
||||
private RecoverableZooKeeper zk;
|
||||
public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
|
||||
throws ZooKeeperConnectionException, IOException {
|
||||
super(conf, identifier, abortable);
|
||||
}
|
||||
|
||||
public void init() throws Exception {
|
||||
this.zk = spy(super.getRecoverableZooKeeper());
|
||||
doThrow(new KeeperException.ConnectionLossException())
|
||||
.when(zk).getData("/hbase/replication/hfile-refs", null, new Stat());
|
||||
}
|
||||
|
||||
public RecoverableZooKeeper getRecoverableZooKeeper() {
|
||||
return zk;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue