HBASE-23340 hmaster /hbase/replication/rs session expired (hbase repl… (#2739)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bo Cui 2020-12-12 21:10:33 +08:00 committed by GitHub
parent f098461a55
commit d50816fe44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 20 deletions

View File

@ -1336,17 +1336,17 @@ public class HMaster extends HRegionServer implements MasterServices {
// Create cleaner thread pool // Create cleaner thread pool
cleanerPool = new DirScanPool(conf); cleanerPool = new DirScanPool(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
// Start log cleaner thread // Start log cleaner thread
int cleanerInterval = int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf, this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool); getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params);
getChoreService().scheduleChore(logCleaner); getChoreService().scheduleChore(logCleaner);
// start the hfile archive cleaner thread // start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getChoreService().scheduleChore(hfileCleaner); getChoreService().scheduleChore(hfileCleaner);

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
* @param pool the thread pool used to scan directories * @param pool the thread pool used to scan directories
*/ */
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool) { Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool); pool, params);
this.pendingDelete = new LinkedBlockingQueue<>(); this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size); this.oldWALsCleaner = createOldWalsCleaner(size);

View File

@ -19,16 +19,19 @@ package org.apache.hadoop.hbase.replication.master;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,7 +46,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate { public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
private ZKWatcher zkw; private ZKWatcher zkw = null;
private boolean shareZK = false;
private ReplicationQueueStorage queueStorage; private ReplicationQueueStorage queueStorage;
private boolean stopped = false; private boolean stopped = false;
private Set<String> wals; private Set<String> wals;
@ -92,12 +96,20 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
} }
@Override @Override
public void setConf(Configuration config) { public void init(Map<String, Object> params) {
// Make my own Configuration. Then I'll have my own connection to zk that super.init(params);
// I can close myself when comes time.
Configuration conf = new Configuration(config);
try { try {
setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null)); if (MapUtils.isNotEmpty(params)) {
Object master = params.get(HMaster.MASTER);
if (master != null && master instanceof HMaster) {
zkw = ((HMaster) master).getZooKeeper();
shareZK = true;
}
}
if (zkw == null) {
zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
}
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e); LOG.error("Error while configuring " + this.getClass().getName(), e);
} }
@ -126,7 +138,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
public void stop(String why) { public void stop(String why) {
if (this.stopped) return; if (this.stopped) return;
this.stopped = true; this.stopped = true;
if (this.zkw != null) { if (!shareZK && this.zkw != null) {
LOG.info("Stopping " + this.zkw); LOG.info("Stopping " + this.zkw);
this.zkw.close(); this.zkw.close();
} }

View File

@ -205,7 +205,7 @@ public class TestLogsCleaner {
// 10 procedure WALs // 10 procedure WALs
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
cleaner.chore(); cleaner.chore();
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
@ -226,7 +226,7 @@ public class TestLogsCleaner {
} }
@Test @Test
public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception {
ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> dummyFiles = Arrays.asList( List<FileStatus> dummyFiles = Arrays.asList(
@ -239,7 +239,7 @@ public class TestLogsCleaner {
final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
try { try {
faultyZK.init(); faultyZK.init(false);
ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory
.getReplicationQueueStorage(faultyZK, conf)); .getReplicationQueueStorage(faultyZK, conf));
doAnswer(new Answer<Object>() { doAnswer(new Answer<Object>() {
@ -263,6 +263,18 @@ public class TestLogsCleaner {
assertTrue(getListOfReplicatorsFailed.get()); assertTrue(getListOfReplicatorsFailed.get());
assertFalse(toDelete.iterator().hasNext()); assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped()); assertFalse(cleaner.isStopped());
//zk recovery.
faultyZK.init(true);
cleaner.preClean();
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator();
assertTrue(iter.hasNext());
assertEquals(new Path("log1"), iter.next().getPath());
assertTrue(iter.hasNext());
assertEquals(new Path("log2"), iter.next().getPath());
assertFalse(iter.hasNext());
} finally { } finally {
faultyZK.close(); faultyZK.close();
} }
@ -306,7 +318,7 @@ public class TestLogsCleaner {
Server server = new DummyServer(); Server server = new DummyServer();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL); LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
int size = cleaner.getSizeOfCleaners(); int size = cleaner.getSizeOfCleaners();
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
cleaner.getCleanerThreadTimeoutMsec()); cleaner.getCleanerThreadTimeoutMsec());
@ -426,10 +438,12 @@ public class TestLogsCleaner {
super(conf, identifier, abortable); super(conf, identifier, abortable);
} }
public void init() throws Exception { public void init(boolean autoRecovery) throws Exception {
this.zk = spy(super.getRecoverableZooKeeper()); this.zk = spy(super.getRecoverableZooKeeper());
doThrow(new KeeperException.ConnectionLossException()) if (!autoRecovery) {
.when(zk).getChildren("/hbase/replication/rs", null); doThrow(new KeeperException.ConnectionLossException())
.when(zk).getChildren("/hbase/replication/rs", null);
}
} }
@Override @Override

View File

@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
public boolean isStopped() { public boolean isStopped() {
return stopped; return stopped;
} }
}, conf, fs, globalWALArchiveDir, cleanerPool); }, conf, fs, globalWALArchiveDir, cleanerPool, null);
choreService.scheduleChore(logCleaner); choreService.scheduleChore(logCleaner);
} }