HBASE-3963 Schedule all log-spliiting at startup all at once
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1135317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
006b4b189c
commit
1e5deae9e9
|
@ -122,6 +122,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3923 HBASE-1502 Broke Shell's status 'simple' and 'detailed'
|
||||
HBASE-3978 Rowlock lease renew doesn't work when custom coprocessor
|
||||
indicates to bypass default action (Ming Ma)
|
||||
HBASE-3963 Schedule all log-spliiting at startup all at once (mingjian)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||
|
|
|
@ -1183,7 +1183,7 @@ HTable table2 = new HTable(conf2, "myTable");</programlisting>
|
|||
is an experimental feature).
|
||||
</para>
|
||||
</section>
|
||||
<section><title>HFile Tool</title>
|
||||
<section xml:id="hfile_tool2"><title>HFile Tool</title>
|
||||
<para>See <xref linkend="hfile_tool" />.</para>
|
||||
</section>
|
||||
<section xml:id="wal_tools">
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
@ -175,7 +177,7 @@ public class MasterFileSystem {
|
|||
/**
|
||||
* Inspect the log directory to recover any log file without
|
||||
* an active region server.
|
||||
* @param onlineServers Map of online servers keyed by
|
||||
* @param onlineServers Set of online servers keyed by
|
||||
* {@link ServerName}
|
||||
*/
|
||||
void splitLogAfterStartup(final Set<ServerName> onlineServers) {
|
||||
|
@ -197,64 +199,78 @@ public class MasterFileSystem {
|
|||
LOG.debug("No log files to split, proceeding...");
|
||||
return;
|
||||
}
|
||||
List<ServerName> serverNames = new ArrayList<ServerName>();
|
||||
for (FileStatus status : logFolders) {
|
||||
ServerName serverName = new ServerName(status.getPath().getName());
|
||||
if (!onlineServers.contains(serverName)) {
|
||||
if (onlineServers.contains(serverName)) {
|
||||
LOG.info("Log folder " + status.getPath() + " doesn't belong " +
|
||||
"to a known region server, splitting");
|
||||
splitLog(serverName);
|
||||
serverNames.add(serverName);
|
||||
} else {
|
||||
LOG.info("Log folder " + status.getPath() +
|
||||
" belongs to an existing region server");
|
||||
}
|
||||
}
|
||||
}
|
||||
splitLog(serverNames);
|
||||
}
|
||||
|
||||
public void splitLog(final ServerName serverName) {
|
||||
|
||||
public void splitLog(final ServerName serverName){
|
||||
List<ServerName> serverNames = new ArrayList<ServerName>();
|
||||
serverNames.add(serverName);
|
||||
splitLog(serverNames);
|
||||
}
|
||||
|
||||
public void splitLog(final List<ServerName> serverNames) {
|
||||
long splitTime = 0, splitLogSize = 0;
|
||||
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString()));
|
||||
List<Path> logDirs = new ArrayList<Path>();
|
||||
for(ServerName serverName: serverNames){
|
||||
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString()));
|
||||
logDirs.add(logDir);
|
||||
}
|
||||
|
||||
if (distributedLogSplitting) {
|
||||
splitTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
try {
|
||||
splitLogSize = splitLogManager.splitLogDistributed(logDir);
|
||||
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
|
||||
} catch (OrphanHLogAfterSplitException e) {
|
||||
LOG.warn("Retrying distributed splitting for " +
|
||||
serverName + "because of:", e);
|
||||
splitLogManager.splitLogDistributed(logDir);
|
||||
serverNames + "because of:", e);
|
||||
splitLogManager.splitLogDistributed(logDirs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed distributed splitting " + serverName, e);
|
||||
LOG.error("Failed distributed splitting " + serverNames, e);
|
||||
}
|
||||
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
|
||||
} else {
|
||||
// splitLogLock ensures that dead region servers' logs are processed
|
||||
// one at a time
|
||||
this.splitLogLock.lock();
|
||||
|
||||
try {
|
||||
HLogSplitter splitter = HLogSplitter.createLogSplitter(
|
||||
for(Path logDir: logDirs){
|
||||
// splitLogLock ensures that dead region servers' logs are processed
|
||||
// one at a time
|
||||
this.splitLogLock.lock();
|
||||
try {
|
||||
HLogSplitter splitter = HLogSplitter.createLogSplitter(
|
||||
conf, rootdir, logDir, oldLogDir, this.fs);
|
||||
try {
|
||||
// If FS is in safe mode, just wait till out of it.
|
||||
FSUtils.waitOnSafeMode(conf,
|
||||
conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000));
|
||||
splitter.splitLog();
|
||||
} catch (OrphanHLogAfterSplitException e) {
|
||||
LOG.warn("Retrying splitting because of:", e);
|
||||
// An HLogSplitter instance can only be used once. Get new instance.
|
||||
splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir,
|
||||
try {
|
||||
// If FS is in safe mode, just wait till out of it.
|
||||
FSUtils.waitOnSafeMode(conf, conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000));
|
||||
splitter.splitLog();
|
||||
} catch (OrphanHLogAfterSplitException e) {
|
||||
LOG.warn("Retrying splitting because of:", e);
|
||||
//An HLogSplitter instance can only be used once. Get new instance.
|
||||
splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir,
|
||||
oldLogDir, this.fs);
|
||||
splitter.splitLog();
|
||||
splitter.splitLog();
|
||||
}
|
||||
splitTime = splitter.getTime();
|
||||
splitLogSize = splitter.getSize();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed splitting " + logDir.toString(), e);
|
||||
} finally {
|
||||
this.splitLogLock.unlock();
|
||||
}
|
||||
splitTime = splitter.getTime();
|
||||
splitLogSize = splitter.getSize();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed splitting " + logDir.toString(), e);
|
||||
} finally {
|
||||
this.splitLogLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
if (this.metrics != null) {
|
||||
this.metrics.addSplit(splitTime, splitLogSize);
|
||||
}
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -167,6 +169,40 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
private FileStatus[] getFileList(List<Path> logDirs) throws IOException {
|
||||
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
|
||||
for (Path hLogDir : logDirs) {
|
||||
this.fs = hLogDir.getFileSystem(conf);
|
||||
if (!fs.exists(hLogDir)) {
|
||||
LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
|
||||
continue;
|
||||
}
|
||||
FileStatus[] logfiles = fs.listStatus(hLogDir); // TODO filter filenames?
|
||||
if (logfiles == null || logfiles.length == 0) {
|
||||
LOG.info(hLogDir + " is empty dir, no logs to split");
|
||||
} else {
|
||||
for (FileStatus status : logfiles)
|
||||
fileStatus.add(status);
|
||||
}
|
||||
}
|
||||
if (fileStatus.isEmpty())
|
||||
return null;
|
||||
FileStatus[] a = new FileStatus[fileStatus.size()];
|
||||
return fileStatus.toArray(a);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param logDir
|
||||
* one region sever hlog dir path in .logs
|
||||
* @throws IOException
|
||||
* if there was an error while splitting any log file
|
||||
* @return cumulative size of the logfiles split
|
||||
*/
|
||||
public long splitLogDistributed(final Path logDir) throws IOException {
|
||||
List<Path> logDirs = new ArrayList<Path>();
|
||||
logDirs.add(logDir);
|
||||
return splitLogDistributed(logDirs);
|
||||
}
|
||||
/**
|
||||
* The caller will block until all the log files of the given region server
|
||||
* have been processed - successfully split or an error is encountered - by an
|
||||
|
@ -179,28 +215,18 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
* if there was an error while splitting any log file
|
||||
* @return cumulative size of the logfiles split
|
||||
*/
|
||||
public long splitLogDistributed(final Path logDir) throws IOException {
|
||||
this.fs = logDir.getFileSystem(conf);
|
||||
if (!fs.exists(logDir)) {
|
||||
LOG.warn(logDir + " doesn't exist. Nothing to do!");
|
||||
return 0;
|
||||
}
|
||||
|
||||
public long splitLogDistributed(final List<Path> logDirs) throws IOException {
|
||||
MonitoredTask status = TaskMonitor.get().createStatus(
|
||||
"Doing distributed log split in " + logDir);
|
||||
|
||||
status.setStatus("Checking directory contents...");
|
||||
FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames?
|
||||
if (logfiles == null || logfiles.length == 0) {
|
||||
LOG.info(logDir + " is empty dir, no logs to split");
|
||||
"Doing distributed log split in " + logDirs);
|
||||
FileStatus[] logfiles = getFileList(logDirs);
|
||||
if(logfiles == null)
|
||||
return 0;
|
||||
}
|
||||
|
||||
status.setStatus("Scheduling batch of logs to split");
|
||||
status.setStatus("Checking directory contents...");
|
||||
LOG.debug("Scheduling batch of logs to split");
|
||||
tot_mgr_log_split_batch_start.incrementAndGet();
|
||||
LOG.info("started splitting logs in " + logDir);
|
||||
LOG.info("started splitting logs in " + logDirs);
|
||||
long t = EnvironmentEdgeManager.currentTimeMillis();
|
||||
long totalSize = 0;
|
||||
long totalSize = 0;
|
||||
TaskBatch batch = new TaskBatch();
|
||||
for (FileStatus lf : logfiles) {
|
||||
// TODO If the log file is still being written to - which is most likely
|
||||
|
@ -218,32 +244,29 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
if (batch.done != batch.installed) {
|
||||
stopTrackingTasks(batch);
|
||||
tot_mgr_log_split_batch_err.incrementAndGet();
|
||||
LOG.warn("error while splitting logs in " + logDir +
|
||||
LOG.warn("error while splitting logs in " + logDirs +
|
||||
" installed = " + batch.installed + " but only " + batch.done + " done");
|
||||
throw new IOException("error or interrupt while splitting logs in "
|
||||
+ logDir + " Task = " + batch);
|
||||
+ logDirs + " Task = " + batch);
|
||||
}
|
||||
|
||||
status.setStatus("Checking for orphaned logs in log directory...");
|
||||
if (anyNewLogFiles(logDir, logfiles)) {
|
||||
tot_mgr_new_unexpected_hlogs.incrementAndGet();
|
||||
LOG.warn("new hlogs were produced while logs in " + logDir +
|
||||
for(Path logDir: logDirs){
|
||||
if (anyNewLogFiles(logDir, logfiles)) {
|
||||
tot_mgr_new_unexpected_hlogs.incrementAndGet();
|
||||
LOG.warn("new hlogs were produced while logs in " + logDir +
|
||||
" were being split");
|
||||
throw new OrphanHLogAfterSplitException();
|
||||
throw new OrphanHLogAfterSplitException();
|
||||
}
|
||||
tot_mgr_log_split_batch_success.incrementAndGet();
|
||||
status.setStatus("Cleaning up log directory...");
|
||||
if (!fs.delete(logDir, true)) {
|
||||
throw new IOException("Unable to delete src dir: " + logDir);
|
||||
}
|
||||
}
|
||||
tot_mgr_log_split_batch_success.incrementAndGet();
|
||||
|
||||
status.setStatus("Cleaning up log directory...");
|
||||
if (!fs.delete(logDir, true)) {
|
||||
throw new IOException("Unable to delete src dir: " + logDir);
|
||||
}
|
||||
|
||||
String msg = "finished splitting (more than or equal to) " + totalSize +
|
||||
" bytes in " + batch.installed + " log files in " + logDir + " in " +
|
||||
" bytes in " + batch.installed + " log files in " + logDirs + " in " +
|
||||
(EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
|
||||
status.markComplete(msg);
|
||||
LOG.info(msg);
|
||||
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
|
@ -965,4 +988,4 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
*/
|
||||
public Status finish(String workerName, String taskname);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue