HBASE-27556 Reuse Zookeeper session of Master in LogCleaner (#4946)
Backport of HBASE-23340 hmaster /hbase/replication/rs session expired (hbase replication default value is true, we don't use ) causes logcleaner can not clean oldWALs, which resulits in oldWALs too large (more than 2TB)) Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org> Signed-off-by: Pankaj Kumar<pankajkumar@apache.org>
This commit is contained in:
parent
9cae2bb70e
commit
695dce5bd5
|
@ -1427,8 +1427,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Start log cleaner thread
|
||||
int cleanerInterval =
|
||||
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
|
||||
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
|
||||
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), logCleanerPool);
|
||||
this.logCleaner =
|
||||
new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(),
|
||||
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
|
||||
getChoreService().scheduleChore(logCleaner);
|
||||
|
||||
// start the hfile archive cleaner thread
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
|
|||
* @param pool the thread pool used to scan directories
|
||||
*/
|
||||
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
||||
Path oldLogDir, DirScanPool pool) {
|
||||
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
|
||||
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
|
||||
pool);
|
||||
pool, params);
|
||||
this.pendingDelete = new LinkedBlockingQueue<>();
|
||||
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
|
||||
this.oldWALsCleaner = createOldWalsCleaner(size);
|
||||
|
|
|
@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.replication.master;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
|
@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
|
||||
|
||||
/**
|
||||
* Implementation of a log cleaner that checks if a log is still scheduled for replication before
|
||||
|
@ -43,7 +46,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
|||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
|
||||
private ZKWatcher zkw;
|
||||
private ZKWatcher zkw = null;
|
||||
private boolean shareZK = false;
|
||||
private ReplicationQueueStorage queueStorage;
|
||||
private boolean stopped = false;
|
||||
private Set<String> wals;
|
||||
|
@ -92,12 +96,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration config) {
|
||||
// Make my own Configuration. Then I'll have my own connection to zk that
|
||||
// I can close myself when comes time.
|
||||
Configuration conf = new Configuration(config);
|
||||
public void init(Map<String, Object> params) {
|
||||
super.init(params);
|
||||
try {
|
||||
setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null));
|
||||
if (MapUtils.isNotEmpty(params)) {
|
||||
Object master = params.get(HMaster.MASTER);
|
||||
if (master != null && master instanceof HMaster) {
|
||||
zkw = ((HMaster) master).getZooKeeper();
|
||||
shareZK = true;
|
||||
}
|
||||
}
|
||||
if (zkw == null) {
|
||||
zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
|
||||
}
|
||||
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||
}
|
||||
|
@ -118,7 +130,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
|||
public void stop(String why) {
|
||||
if (this.stopped) return;
|
||||
this.stopped = true;
|
||||
if (this.zkw != null) {
|
||||
if (!shareZK && this.zkw != null) {
|
||||
LOG.info("Stopping " + this.zkw);
|
||||
this.zkw.close();
|
||||
}
|
||||
|
|
|
@ -187,7 +187,7 @@ public class TestLogsCleaner {
|
|||
// 10 procedure WALs
|
||||
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
|
||||
|
||||
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL);
|
||||
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
|
||||
cleaner.chore();
|
||||
|
||||
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
|
||||
|
@ -281,7 +281,7 @@ public class TestLogsCleaner {
|
|||
Server server = new DummyServer();
|
||||
|
||||
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL);
|
||||
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
|
||||
int size = cleaner.getSizeOfCleaners();
|
||||
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
|
||||
cleaner.getCleanerThreadTimeoutMsec());
|
||||
|
|
|
@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
|
|||
public boolean isStopped() {
|
||||
return stopped;
|
||||
}
|
||||
}, conf, fs, globalWALArchiveDir, logCleanerPool);
|
||||
}, conf, fs, globalWALArchiveDir, logCleanerPool, null);
|
||||
choreService.scheduleChore(logCleaner);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue