HBASE-20054 Forward port HBASE-18282 ReplicationLogCleaner can delete WALs not yet replicated in case of KeeperException

This commit is contained in:
tedyu 2018-02-23 19:14:52 -08:00
parent 216d2d4648
commit 9a7645360b
2 changed files with 67 additions and 19 deletions

View File

@ -110,6 +110,13 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
LOG.error("Error while configuring " + this.getClass().getName(), e); LOG.error("Error while configuring " + this.getClass().getName(), e);
} }
} }
@VisibleForTesting
public void setConf(Configuration conf, ZKWatcher zk,
ReplicationQueueStorage replicationQueueStorage) {
super.setConf(conf);
this.zkw = zk;
this.queueStorage = replicationQueueStorage;
}
@Override @Override
public void stop(String why) { public void stop(String why) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -28,6 +29,8 @@ import java.net.URLEncoder;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -46,6 +49,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
@ -55,12 +59,13 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -195,11 +200,57 @@ public class TestLogsCleaner {
} }
} }
@Test(timeout=10000)
public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> dummyFiles = Lists.newArrayList(
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
);
FaultyZooKeeperWatcher faultyZK =
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
try {
faultyZK.init();
ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory
.getReplicationQueueStorage(faultyZK, conf));
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
return invocation.callRealMethod();
} catch (ReplicationException e) {
LOG.debug("caught " + e);
getListOfReplicatorsFailed.set(true);
throw e;
}
}
}).when(queueStorage).getAllWALs();
cleaner.setConf(conf, faultyZK, queueStorage);
// should keep all files due to a ConnectionLossException getting the queues znodes
cleaner.preClean();
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertTrue(getListOfReplicatorsFailed.get());
assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped());
} finally {
faultyZK.close();
}
}
/** /**
* ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. * When zk is working both files should be returned
* @throws Exception from ZK watcher
*/ */
@Test @Test(timeout=10000)
public void testZooKeeperAbort() throws Exception { public void testZooKeeperNormal() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
@ -208,20 +259,8 @@ public class TestLogsCleaner {
new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
); );
try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
"testZooKeeperAbort-faulty", null)) { try {
faultyZK.init();
cleaner.setConf(conf, faultyZK);
cleaner.preClean();
// should keep all files due to a ConnectionLossException getting the queues znodes
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped());
}
// when zk is working both files should be returned
cleaner = new ReplicationLogCleaner();
try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) {
cleaner.setConf(conf, zkw); cleaner.setConf(conf, zkw);
cleaner.preClean(); cleaner.preClean();
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
@ -231,6 +270,8 @@ public class TestLogsCleaner {
assertTrue(iter.hasNext()); assertTrue(iter.hasNext());
assertEquals(new Path("log2"), iter.next().getPath()); assertEquals(new Path("log2"), iter.next().getPath());
assertFalse(iter.hasNext()); assertFalse(iter.hasNext());
} finally {
zkw.close();
} }
} }
@ -371,7 +412,7 @@ public class TestLogsCleaner {
public void init() throws Exception { public void init() throws Exception {
this.zk = spy(super.getRecoverableZooKeeper()); this.zk = spy(super.getRecoverableZooKeeper());
doThrow(new KeeperException.ConnectionLossException()) doThrow(new KeeperException.ConnectionLossException())
.when(zk).getData("/hbase/replication/rs", null, new Stat()); .when(zk).getChildren("/hbase/replication/rs", null);
} }
@Override @Override