From d50816fe448971b8e586792f0584aaf601e31780 Mon Sep 17 00:00:00 2001 From: Bo Cui Date: Sat, 12 Dec 2020 21:10:33 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-23340=20hmaster=20/hbase/replication/rs?= =?UTF-8?q?=20session=20expired=20(hbase=20repl=E2=80=A6=20(#2739)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Duo Zhang --- .../apache/hadoop/hbase/master/HMaster.java | 6 ++-- .../hbase/master/cleaner/LogCleaner.java | 5 ++-- .../master/ReplicationLogCleaner.java | 26 ++++++++++++----- .../hbase/master/cleaner/TestLogsCleaner.java | 28 ++++++++++++++----- .../region/TestMasterRegionWALCleaner.java | 2 +- 5 files changed, 47 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7d29ed66882..a61254f5610 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1336,17 +1336,17 @@ public class HMaster extends HRegionServer implements MasterServices { // Create cleaner thread pool cleanerPool = new DirScanPool(conf); + Map params = new HashMap<>(); + params.put(MASTER, this); // Start log cleaner thread int cleanerInterval = conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, - getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool); + getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params); getChoreService().scheduleChore(logCleaner); // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); - Map params = new HashMap<>(); - params.put(MASTER, this); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); getChoreService().scheduleChore(hfileCleaner); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index f65713ebf26..d8993b38ffe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore * @param pool the thread pool used to scan directories */ public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path oldLogDir, DirScanPool pool) { + Path oldLogDir, DirScanPool pool, Map params) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, - pool); + pool, params); this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 8f016bcb912..a7821f1894a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -19,16 +19,19 @@ package org.apache.hadoop.hbase.replication.master; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +46,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); - private ZKWatcher zkw; + private ZKWatcher zkw = null; + private boolean shareZK = false; private ReplicationQueueStorage queueStorage; private boolean stopped = false; private Set wals; @@ -92,12 +96,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } @Override - public void setConf(Configuration config) { - // Make my own Configuration. Then I'll have my own connection to zk that - // I can close myself when comes time. - Configuration conf = new Configuration(config); + public void init(Map params) { + super.init(params); try { - setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null)); + if (MapUtils.isNotEmpty(params)) { + Object master = params.get(HMaster.MASTER); + if (master != null && master instanceof HMaster) { + zkw = ((HMaster) master).getZooKeeper(); + shareZK = true; + } + } + if (zkw == null) { + zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null); + } + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } @@ -126,7 +138,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { public void stop(String why) { if (this.stopped) return; this.stopped = true; - if (this.zkw != null) { + if (!shareZK && this.zkw != null) { LOG.info("Stopping " + this.zkw); this.zkw.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 8ed31d009fb..064f9a65762 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -205,7 +205,7 @@ public class TestLogsCleaner { // 10 procedure WALs assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which @@ -226,7 +226,7 @@ public class TestLogsCleaner { } @Test - public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { + public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception { ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); List dummyFiles = Arrays.asList( @@ -239,7 +239,7 @@ public class TestLogsCleaner { final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); try { - faultyZK.init(); + faultyZK.init(false); ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory .getReplicationQueueStorage(faultyZK, conf)); doAnswer(new Answer() { @@ -263,6 +263,18 @@ public class TestLogsCleaner { assertTrue(getListOfReplicatorsFailed.get()); assertFalse(toDelete.iterator().hasNext()); assertFalse(cleaner.isStopped()); + + //zk recovery. + faultyZK.init(true); + cleaner.preClean(); + Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); + Iterator iter = filesToDelete.iterator(); + assertTrue(iter.hasNext()); + assertEquals(new Path("log1"), iter.next().getPath()); + assertTrue(iter.hasNext()); + assertEquals(new Path("log2"), iter.next().getPath()); + assertFalse(iter.hasNext()); + } finally { faultyZK.close(); } @@ -306,7 +318,7 @@ public class TestLogsCleaner { Server server = new DummyServer(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); int size = cleaner.getSizeOfCleaners(); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); @@ -426,10 +438,12 @@ public class TestLogsCleaner { super(conf, identifier, abortable); } - public void init() throws Exception { + public void init(boolean autoRecovery) throws Exception { this.zk = spy(super.getRecoverableZooKeeper()); - doThrow(new KeeperException.ConnectionLossException()) - .when(zk).getChildren("/hbase/replication/rs", null); + if (!autoRecovery) { + doThrow(new KeeperException.ConnectionLossException()) + .when(zk).getChildren("/hbase/replication/rs", null); + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java index 08b5f995190..39497b07e52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java @@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase { public boolean isStopped() { return stopped; } - }, conf, fs, globalWALArchiveDir, cleanerPool); + }, conf, fs, globalWALArchiveDir, cleanerPool, null); choreService.scheduleChore(logCleaner); }