From 078e2619ea01889e19b4419ac3eab66c4ac1d3ae Mon Sep 17 00:00:00 2001 From: Bo Cui Date: Thu, 14 Jan 2021 11:36:07 +0800 Subject: [PATCH] HubSpot Backport: HBASE-23340 hmaster /hbase/replication/rs session expired (hbase replication default value is true, we don't use ) causes logcleaner can not clean oldWALs, which resulits in oldWALs too large (more than 2TB) (#2779) Signed-off-by: Duo Zhang Signed-off-by: Pankaj Kumar --- .../apache/hadoop/hbase/master/HMaster.java | 6 ++--- .../hbase/master/cleaner/LogCleaner.java | 5 ++-- .../master/ReplicationLogCleaner.java | 26 ++++++++++++++----- .../hbase/master/cleaner/TestLogsCleaner.java | 4 +-- .../region/TestMasterRegionWALCleaner.java | 2 +- 5 files changed, 28 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ab765f74177..354c8feaa1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1417,17 +1417,17 @@ public class HMaster extends HRegionServer implements MasterServices { // Create cleaner thread pool cleanerPool = new DirScanPool(conf); + Map params = new HashMap<>(); + params.put(MASTER, this); // Start log cleaner thread int cleanerInterval = conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, - getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool); + getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params); getChoreService().scheduleChore(logCleaner); // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); - Map params = new HashMap<>(); - params.put(MASTER, this); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); getChoreService().scheduleChore(hfileCleaner); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index f65713ebf26..d8993b38ffe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore * @param pool the thread pool used to scan directories */ public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, - Path oldLogDir, DirScanPool pool) { + Path oldLogDir, DirScanPool pool, Map params) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, - pool); + pool, params); this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 148b33037cd..705302efcd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -19,16 +19,19 @@ package org.apache.hadoop.hbase.replication.master; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +46,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); - private ZKWatcher zkw; + private ZKWatcher zkw = null; + private boolean shareZK = false; private ReplicationQueueStorage queueStorage; private boolean stopped = false; private Set wals; @@ -92,12 +96,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } @Override - public void setConf(Configuration config) { - // Make my own Configuration. Then I'll have my own connection to zk that - // I can close myself when comes time. - Configuration conf = new Configuration(config); + public void init(Map params) { + super.init(params); try { - setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null)); + if (MapUtils.isNotEmpty(params)) { + Object master = params.get(HMaster.MASTER); + if (master != null && master instanceof HMaster) { + zkw = ((HMaster) master).getZooKeeper(); + shareZK = true; + } + } + if (zkw == null) { + zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null); + } + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } @@ -118,7 +130,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { public void stop(String why) { if (this.stopped) return; this.stopped = true; - if (this.zkw != null) { + if (!shareZK && this.zkw != null) { LOG.info("Stopping " + this.zkw); this.zkw.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index b3d78fce096..ac29fee0a1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -200,7 +200,7 @@ public class TestLogsCleaner { // 10 procedure WALs assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which @@ -297,7 +297,7 @@ public class TestLogsCleaner { Server server = new DummyServer(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); int size = cleaner.getSizeOfCleaners(); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java index 18afd3c9eb0..5da77379c43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java @@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase { public boolean isStopped() { return stopped; } - }, conf, fs, globalWALArchiveDir, cleanerPool); + }, conf, fs, globalWALArchiveDir, cleanerPool, null); choreService.scheduleChore(logCleaner); }