HBASE-21155 Save on a few log strings and some churn in wal splitter by skipping out early if no logs in dir
This commit is contained in:
parent
f8b12805bb
commit
3ac3249423
|
@ -83,6 +83,7 @@ public final class ProcedureWALFormat {
|
|||
// Ignore the last log which is current active log.
|
||||
while (logs.hasNext()) {
|
||||
ProcedureWALFile log = logs.next();
|
||||
LOG.debug("Loading WAL id={}", log.getLogId());
|
||||
log.open();
|
||||
try {
|
||||
reader.read(log);
|
||||
|
|
|
@ -424,6 +424,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
it.next(); // Skip the current log
|
||||
|
||||
ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
|
||||
long count = 0;
|
||||
|
||||
@Override
|
||||
public void setMaxProcId(long maxProcId) {
|
||||
loader.setMaxProcId(maxProcId);
|
||||
|
@ -431,6 +433,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
|
||||
@Override
|
||||
public void load(ProcedureIterator procIter) throws IOException {
|
||||
if ((++count % 1000) == 0) {
|
||||
// Log every 1000 procedures otherwise it looks like Master is dead if loads of WALs
|
||||
// and procedures to load.
|
||||
LOG.debug("Loaded {} procedures", this.count);
|
||||
}
|
||||
loader.load(procIter);
|
||||
}
|
||||
|
||||
|
|
|
@ -585,7 +585,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
abort(error, t);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}), getName() + ":becomeActiveMaster");
|
||||
}
|
||||
// Fall in here even if we have been aborted. Need to run the shutdown services and
|
||||
// the super run call will do this for us.
|
||||
|
@ -924,7 +924,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
HBaseFsck.createLockRetryCounterFactory(this.conf).create());
|
||||
}
|
||||
|
||||
status.setStatus("Initialze ServerManager and schedule SCP for crash servers");
|
||||
status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
|
||||
this.serverManager = createServerManager(this);
|
||||
createProcedureExecutor();
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
@ -946,6 +946,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
.collect(Collectors.toList());
|
||||
this.assignmentManager.setupRIT(ritList);
|
||||
|
||||
// Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
|
||||
// be registered in the deadServers set -- and with the list of servernames out on the
|
||||
// filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
|
||||
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.regionServerTracker.start(
|
||||
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
|
||||
|
|
|
@ -143,6 +143,11 @@ public class MasterWalManager {
|
|||
return this.fsOk;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return listing of ServerNames found in the filesystem under the WAL directory
|
||||
* that COULD BE 'alive'; excludes those that have a '-splitting' suffix as these are already
|
||||
* being split -- they cannot be 'alive'.
|
||||
*/
|
||||
public Set<ServerName> getLiveServersFromWALDir() throws IOException {
|
||||
Path walDirPath = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
FileStatus[] walDirForLiveServers = FSUtils.listStatus(fs, walDirPath,
|
||||
|
|
|
@ -111,13 +111,15 @@ public class RegionServerTracker extends ZKListener {
|
|||
* In this method, we will also construct the region server sets in {@link ServerManager}. If a
|
||||
* region server is dead between the crash of the previous master instance and the start of the
|
||||
* current master instance, we will schedule a SCP for it. This is done in
|
||||
* {@link ServerManager#findOutDeadServersAndProcess(Set, Set)}, we call it here under the lock
|
||||
* {@link ServerManager#findDeadServersAndProcess(Set, Set)}, we call it here under the lock
|
||||
* protection to prevent concurrency issues with server expiration operation.
|
||||
* @param deadServersFromPE the region servers which already have SCP associated.
|
||||
* @param liveServersFromWALDir the live region servers from wal directory.
|
||||
*/
|
||||
public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir)
|
||||
throws KeeperException, IOException {
|
||||
LOG.info("Starting RegionServerTracker; {} have existing ServerCrashProcedures, {} " +
|
||||
"possibly 'live' servers.", deadServersFromPE.size(), liveServersFromWALDir.size());
|
||||
watcher.registerListener(this);
|
||||
synchronized (this) {
|
||||
List<String> servers =
|
||||
|
@ -132,7 +134,7 @@ public class RegionServerTracker extends ZKListener {
|
|||
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
|
||||
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
|
||||
}
|
||||
serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
|
||||
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -350,10 +350,10 @@ public class ServerManager {
|
|||
* <p/>
|
||||
* Must be called inside the initialization method of {@code RegionServerTracker} to avoid
|
||||
* concurrency issue.
|
||||
* @param deadServersFromPE the region servers which already have SCP associated.
|
||||
* @param deadServersFromPE the region servers which already have a SCP associated.
|
||||
* @param liveServersFromWALDir the live region servers from wal directory.
|
||||
*/
|
||||
void findOutDeadServersAndProcess(Set<ServerName> deadServersFromPE,
|
||||
void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
|
||||
Set<ServerName> liveServersFromWALDir) {
|
||||
deadServersFromPE.forEach(deadservers::add);
|
||||
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
|
||||
|
@ -621,7 +621,7 @@ public class ServerManager {
|
|||
public void moveFromOnlineToDeadServers(final ServerName sn) {
|
||||
synchronized (onlineServers) {
|
||||
if (!this.onlineServers.containsKey(sn)) {
|
||||
LOG.warn("Expiration of " + sn + " but server not online");
|
||||
LOG.trace("Expiration of {} but server not online", sn);
|
||||
}
|
||||
// Remove the server from the known servers lists and update load info BUT
|
||||
// add to deadservers first; do this so it'll show in dead servers list if
|
||||
|
|
|
@ -174,7 +174,7 @@ public class SplitLogManager {
|
|||
}
|
||||
FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
|
||||
if (logfiles == null || logfiles.length == 0) {
|
||||
LOG.info(logDir + " is empty dir, no logs to split");
|
||||
LOG.info(logDir + " dir is empty, no logs to split.");
|
||||
} else {
|
||||
Collections.addAll(fileStatus, logfiles);
|
||||
}
|
||||
|
@ -235,29 +235,33 @@ public class SplitLogManager {
|
|||
PathFilter filter) throws IOException {
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
|
||||
logDirs + " for serverName=" + serverNames);
|
||||
FileStatus[] logfiles = getFileList(logDirs, filter);
|
||||
status.setStatus("Checking directory contents...");
|
||||
SplitLogCounters.tot_mgr_log_split_batch_start.increment();
|
||||
LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
|
||||
" for " + serverNames);
|
||||
long t = EnvironmentEdgeManager.currentTime();
|
||||
long totalSize = 0;
|
||||
TaskBatch batch = new TaskBatch();
|
||||
for (FileStatus lf : logfiles) {
|
||||
// TODO If the log file is still being written to - which is most likely
|
||||
// the case for the last log file - then its length will show up here
|
||||
// as zero. The size of such a file can only be retrieved after
|
||||
// recover-lease is done. totalSize will be under in most cases and the
|
||||
// metrics that it drives will also be under-reported.
|
||||
totalSize += lf.getLen();
|
||||
String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
|
||||
if (!enqueueSplitTask(pathToLog, batch)) {
|
||||
throw new IOException("duplicate log split scheduled for " + lf.getPath());
|
||||
TaskBatch batch = null;
|
||||
long startTime = 0;
|
||||
FileStatus[] logfiles = getFileList(logDirs, filter);
|
||||
if (logfiles.length != 0) {
|
||||
status.setStatus("Checking directory contents...");
|
||||
SplitLogCounters.tot_mgr_log_split_batch_start.increment();
|
||||
LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
|
||||
" for " + serverNames);
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
batch = new TaskBatch();
|
||||
for (FileStatus lf : logfiles) {
|
||||
// TODO If the log file is still being written to - which is most likely
|
||||
// the case for the last log file - then its length will show up here
|
||||
// as zero. The size of such a file can only be retrieved after
|
||||
// recover-lease is done. totalSize will be under in most cases and the
|
||||
// metrics that it drives will also be under-reported.
|
||||
totalSize += lf.getLen();
|
||||
String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
|
||||
if (!enqueueSplitTask(pathToLog, batch)) {
|
||||
throw new IOException("duplicate log split scheduled for " + lf.getPath());
|
||||
}
|
||||
}
|
||||
waitForSplittingCompletion(batch, status);
|
||||
}
|
||||
waitForSplittingCompletion(batch, status);
|
||||
|
||||
if (batch.done != batch.installed) {
|
||||
if (batch != null && batch.done != batch.installed) {
|
||||
batch.isDead = true;
|
||||
SplitLogCounters.tot_mgr_log_split_batch_err.increment();
|
||||
LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
|
||||
|
@ -285,10 +289,10 @@ public class SplitLogManager {
|
|||
}
|
||||
SplitLogCounters.tot_mgr_log_split_batch_success.increment();
|
||||
}
|
||||
String msg =
|
||||
"finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
|
||||
+ " log files in " + logDirs + " in "
|
||||
+ (EnvironmentEdgeManager.currentTime() - t) + "ms";
|
||||
String msg = "Finished splitting (more than or equal to) " + totalSize +
|
||||
" bytes in " + ((batch == null)? 0: batch.installed) +
|
||||
" log files in " + logDirs + " in " +
|
||||
((startTime == -1)? startTime: (EnvironmentEdgeManager.currentTime() - startTime)) + "ms";
|
||||
status.markComplete(msg);
|
||||
LOG.info(msg);
|
||||
return totalSize;
|
||||
|
@ -448,7 +452,7 @@ public class SplitLogManager {
|
|||
}
|
||||
deadWorkers.add(workerName);
|
||||
}
|
||||
LOG.info("dead splitlog worker " + workerName);
|
||||
LOG.info("Dead splitlog worker {}", workerName);
|
||||
}
|
||||
|
||||
void handleDeadWorkers(Set<ServerName> serverNames) {
|
||||
|
|
Loading…
Reference in New Issue