HBASE-7213 Have HLog files for .META. edits only
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1431935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2ade9c0048
commit
641a592b51
|
@ -960,6 +960,7 @@ Server {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Forcing splitLog and expire of " + sn);
|
LOG.info("Forcing splitLog and expire of " + sn);
|
||||||
|
fileSystemManager.splitMetaLog(sn);
|
||||||
fileSystemManager.splitLog(sn);
|
fileSystemManager.splitLog(sn);
|
||||||
serverManager.expireServer(sn);
|
serverManager.expireServer(sn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.hbase.ClusterId;
|
import org.apache.hadoop.hbase.ClusterId;
|
||||||
import org.apache.hadoop.hbase.DeserializationException;
|
import org.apache.hadoop.hbase.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -84,6 +85,18 @@ public class MasterFileSystem {
|
||||||
final SplitLogManager splitLogManager;
|
final SplitLogManager splitLogManager;
|
||||||
private final MasterServices services;
|
private final MasterServices services;
|
||||||
|
|
||||||
|
private final static PathFilter META_FILTER = new PathFilter() {
|
||||||
|
public boolean accept(Path p) {
|
||||||
|
return HLogUtil.isMetaFile(p);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private final static PathFilter NON_META_FILTER = new PathFilter() {
|
||||||
|
public boolean accept(Path p) {
|
||||||
|
return !HLogUtil.isMetaFile(p);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
public MasterFileSystem(Server master, MasterServices services,
|
public MasterFileSystem(Server master, MasterServices services,
|
||||||
MetricsMaster metricsMaster, boolean masterRecovery)
|
MetricsMaster metricsMaster, boolean masterRecovery)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -229,7 +242,8 @@ public class MasterFileSystem {
|
||||||
+ " belongs to an existing region server");
|
+ " belongs to an existing region server");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
splitLog(serverNames);
|
splitLog(serverNames, META_FILTER);
|
||||||
|
splitLog(serverNames, NON_META_FILTER);
|
||||||
retrySplitting = false;
|
retrySplitting = false;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Failed splitting of " + serverNames, ioe);
|
LOG.warn("Failed splitting of " + serverNames, ioe);
|
||||||
|
@ -258,8 +272,30 @@ public class MasterFileSystem {
|
||||||
splitLog(serverNames);
|
splitLog(serverNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void splitLog(final List<ServerName> serverNames) throws IOException {
|
/**
|
||||||
|
* Specialized method to handle the splitting for .META. HLog
|
||||||
|
* @param serverName
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void splitMetaLog(final ServerName serverName) throws IOException {
|
||||||
long splitTime = 0, splitLogSize = 0;
|
long splitTime = 0, splitLogSize = 0;
|
||||||
|
List<ServerName> serverNames = new ArrayList<ServerName>();
|
||||||
|
serverNames.add(serverName);
|
||||||
|
List<Path> logDirs = getLogDirs(serverNames);
|
||||||
|
if (logDirs.isEmpty()) {
|
||||||
|
LOG.info("No .META. logs to split");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
splitLogManager.handleDeadWorkers(serverNames);
|
||||||
|
splitTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
splitLogSize = splitLogManager.splitLogDistributed(logDirs, META_FILTER);
|
||||||
|
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
|
||||||
|
if (this.metricsMaster != null) {
|
||||||
|
this.metricsMaster.addSplit(splitTime, splitLogSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Path> getLogDirs(final List<ServerName> serverNames) throws IOException {
|
||||||
List<Path> logDirs = new ArrayList<Path>();
|
List<Path> logDirs = new ArrayList<Path>();
|
||||||
for (ServerName serverName: serverNames) {
|
for (ServerName serverName: serverNames) {
|
||||||
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
|
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
|
||||||
|
@ -277,6 +313,23 @@ public class MasterFileSystem {
|
||||||
}
|
}
|
||||||
logDirs.add(splitDir);
|
logDirs.add(splitDir);
|
||||||
}
|
}
|
||||||
|
return logDirs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void splitLog(final List<ServerName> serverNames) throws IOException {
|
||||||
|
splitLog(serverNames, NON_META_FILTER);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is the base split method that splits HLog files matching a filter.
|
||||||
|
* Callers should pass the appropriate filter for .META. and non-meta HLogs.
|
||||||
|
* @param serverNames
|
||||||
|
* @param filter
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void splitLog(final List<ServerName> serverNames, PathFilter filter) throws IOException {
|
||||||
|
long splitTime = 0, splitLogSize = 0;
|
||||||
|
List<Path> logDirs = getLogDirs(serverNames);
|
||||||
|
|
||||||
if (logDirs.isEmpty()) {
|
if (logDirs.isEmpty()) {
|
||||||
LOG.info("No logs to split");
|
LOG.info("No logs to split");
|
||||||
|
@ -286,7 +339,7 @@ public class MasterFileSystem {
|
||||||
if (distributedLogSplitting) {
|
if (distributedLogSplitting) {
|
||||||
splitLogManager.handleDeadWorkers(serverNames);
|
splitLogManager.handleDeadWorkers(serverNames);
|
||||||
splitTime = EnvironmentEdgeManager.currentTimeMillis();
|
splitTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
|
splitLogSize = splitLogManager.splitLogDistributed(logDirs,filter);
|
||||||
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
|
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
|
||||||
} else {
|
} else {
|
||||||
for(Path logDir: logDirs){
|
for(Path logDir: logDirs){
|
||||||
|
@ -370,7 +423,8 @@ public class MasterFileSystem {
|
||||||
// Make sure cluster ID exists
|
// Make sure cluster ID exists
|
||||||
if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
|
if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
|
||||||
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
|
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
|
||||||
FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
|
||||||
|
10 * 1000));
|
||||||
}
|
}
|
||||||
clusterId = FSUtils.getClusterId(fs, rd);
|
clusterId = FSUtils.getClusterId(fs, rd);
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
import org.apache.hadoop.hbase.SplitLogCounters;
|
import org.apache.hadoop.hbase.SplitLogCounters;
|
||||||
import org.apache.hadoop.hbase.DeserializationException;
|
import org.apache.hadoop.hbase.DeserializationException;
|
||||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
|
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
@ -194,7 +196,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileStatus[] getFileList(List<Path> logDirs) throws IOException {
|
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
|
||||||
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
|
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
|
||||||
for (Path hLogDir : logDirs) {
|
for (Path hLogDir : logDirs) {
|
||||||
this.fs = hLogDir.getFileSystem(conf);
|
this.fs = hLogDir.getFileSystem(conf);
|
||||||
|
@ -202,8 +204,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
|
LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// TODO filter filenames?
|
FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
|
||||||
FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, null);
|
|
||||||
if (logfiles == null || logfiles.length == 0) {
|
if (logfiles == null || logfiles.length == 0) {
|
||||||
LOG.info(hLogDir + " is empty dir, no logs to split");
|
LOG.info(hLogDir + " is empty dir, no logs to split");
|
||||||
} else {
|
} else {
|
||||||
|
@ -228,6 +229,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
logDirs.add(logDir);
|
logDirs.add(logDir);
|
||||||
return splitLogDistributed(logDirs);
|
return splitLogDistributed(logDirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The caller will block until all the log files of the given region server
|
* The caller will block until all the log files of the given region server
|
||||||
* have been processed - successfully split or an error is encountered - by an
|
* have been processed - successfully split or an error is encountered - by an
|
||||||
|
@ -239,9 +241,25 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
* @return cumulative size of the logfiles split
|
* @return cumulative size of the logfiles split
|
||||||
*/
|
*/
|
||||||
public long splitLogDistributed(final List<Path> logDirs) throws IOException {
|
public long splitLogDistributed(final List<Path> logDirs) throws IOException {
|
||||||
|
return splitLogDistributed(logDirs, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The caller will block until all the META log files of the given region server
|
||||||
|
* have been processed - successfully split or an error is encountered - by an
|
||||||
|
* available worker region server. This method must only be called after the
|
||||||
|
* region servers have been brought online.
|
||||||
|
*
|
||||||
|
* @param logDirs List of log dirs to split
|
||||||
|
* @param filter the Path filter to select specific files for considering
|
||||||
|
* @throws IOException If there was an error while splitting any log file
|
||||||
|
* @return cumulative size of the logfiles split
|
||||||
|
*/
|
||||||
|
public long splitLogDistributed(final List<Path> logDirs, PathFilter filter)
|
||||||
|
throws IOException {
|
||||||
MonitoredTask status = TaskMonitor.get().createStatus(
|
MonitoredTask status = TaskMonitor.get().createStatus(
|
||||||
"Doing distributed log split in " + logDirs);
|
"Doing distributed log split in " + logDirs);
|
||||||
FileStatus[] logfiles = getFileList(logDirs);
|
FileStatus[] logfiles = getFileList(logDirs, filter);
|
||||||
status.setStatus("Checking directory contents...");
|
status.setStatus("Checking directory contents...");
|
||||||
LOG.debug("Scheduling batch of logs to split");
|
LOG.debug("Scheduling batch of logs to split");
|
||||||
SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
|
SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
|
||||||
|
|
|
@ -18,11 +18,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.handler;
|
package org.apache.hadoop.hbase.master.handler;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.DeadServer;
|
import org.apache.hadoop.hbase.master.DeadServer;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdown handler for the server hosting <code>-ROOT-</code>,
|
* Shutdown handler for the server hosting <code>-ROOT-</code>,
|
||||||
|
@ -32,7 +38,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
||||||
private final boolean carryingRoot;
|
private final boolean carryingRoot;
|
||||||
private final boolean carryingMeta;
|
private final boolean carryingMeta;
|
||||||
|
private static final Log LOG = LogFactory.getLog(MetaServerShutdownHandler.class);
|
||||||
public MetaServerShutdownHandler(final Server server,
|
public MetaServerShutdownHandler(final Server server,
|
||||||
final MasterServices services,
|
final MasterServices services,
|
||||||
final DeadServer deadServers, final ServerName serverName,
|
final DeadServer deadServers, final ServerName serverName,
|
||||||
|
@ -44,11 +50,118 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
public void process() throws IOException {
|
||||||
|
try {
|
||||||
|
LOG.info("Splitting META logs for " + serverName);
|
||||||
|
if (this.shouldSplitHlog) {
|
||||||
|
this.services.getMasterFileSystem().splitMetaLog(serverName);
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
this.services.getExecutorService().submit(this);
|
||||||
|
this.deadServers.add(serverName);
|
||||||
|
throw new IOException("failed log splitting for " +
|
||||||
|
serverName + ", will retry", ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign root and meta if we were carrying them.
|
||||||
|
if (isCarryingRoot()) { // -ROOT-
|
||||||
|
// Check again: region may be assigned to other where because of RIT
|
||||||
|
// timeout
|
||||||
|
if (this.services.getAssignmentManager().isCarryingRoot(serverName)) {
|
||||||
|
LOG.info("Server " + serverName
|
||||||
|
+ " was carrying ROOT. Trying to assign.");
|
||||||
|
this.services.getAssignmentManager().regionOffline(
|
||||||
|
HRegionInfo.ROOT_REGIONINFO);
|
||||||
|
verifyAndAssignRootWithRetries();
|
||||||
|
} else {
|
||||||
|
LOG.info("ROOT has been assigned to otherwhere, skip assigning.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Carrying meta?
|
||||||
|
if (isCarryingMeta()) {
|
||||||
|
// Check again: region may be assigned to other where because of RIT
|
||||||
|
// timeout
|
||||||
|
if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
|
||||||
|
LOG.info("Server " + serverName
|
||||||
|
+ " was carrying META. Trying to assign.");
|
||||||
|
this.services.getAssignmentManager().regionOffline(
|
||||||
|
HRegionInfo.FIRST_META_REGIONINFO);
|
||||||
|
this.services.getAssignmentManager().assignMeta();
|
||||||
|
} else {
|
||||||
|
LOG.info("META has been assigned to otherwhere, skip assigning.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
super.process();
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Before assign the ROOT region, ensure it haven't
|
||||||
|
* been assigned by other place
|
||||||
|
* <p>
|
||||||
|
* Under some scenarios, the ROOT region can be opened twice, so it seemed online
|
||||||
|
* in two regionserver at the same time.
|
||||||
|
* If the ROOT region has been assigned, so the operation can be canceled.
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws IOException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
private void verifyAndAssignRoot()
|
||||||
|
throws InterruptedException, IOException, KeeperException {
|
||||||
|
long timeout = this.server.getConfiguration().
|
||||||
|
getLong("hbase.catalog.verification.timeout", 1000);
|
||||||
|
if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
|
||||||
|
this.services.getAssignmentManager().assignRoot();
|
||||||
|
} else if (serverName.equals(server.getCatalogTracker().getRootLocation())) {
|
||||||
|
throw new IOException("-ROOT- is onlined on the dead server "
|
||||||
|
+ serverName);
|
||||||
|
} else {
|
||||||
|
LOG.info("Skip assigning -ROOT-, because it is online on the "
|
||||||
|
+ server.getCatalogTracker().getRootLocation());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Failed many times, shutdown processing
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void verifyAndAssignRootWithRetries() throws IOException {
|
||||||
|
int iTimes = this.server.getConfiguration().getInt(
|
||||||
|
"hbase.catalog.verification.retries", 10);
|
||||||
|
|
||||||
|
long waitTime = this.server.getConfiguration().getLong(
|
||||||
|
"hbase.catalog.verification.timeout", 1000);
|
||||||
|
|
||||||
|
int iFlag = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
verifyAndAssignRoot();
|
||||||
|
break;
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
this.server.abort("In server shutdown processing, assigning root", e);
|
||||||
|
throw new IOException("Aborting", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (iFlag >= iTimes) {
|
||||||
|
this.server.abort("verifyAndAssignRoot failed after" + iTimes
|
||||||
|
+ " times retries, aborting", e);
|
||||||
|
throw new IOException("Aborting", e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(waitTime);
|
||||||
|
} catch (InterruptedException e1) {
|
||||||
|
LOG.warn("Interrupted when is the thread sleep", e1);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException("Interrupted", e1);
|
||||||
|
}
|
||||||
|
iFlag++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
boolean isCarryingRoot() {
|
boolean isCarryingRoot() {
|
||||||
return this.carryingRoot;
|
return this.carryingRoot;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean isCarryingMeta() {
|
boolean isCarryingMeta() {
|
||||||
return this.carryingMeta;
|
return this.carryingMeta;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,10 +55,10 @@ import org.apache.zookeeper.KeeperException;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ServerShutdownHandler extends EventHandler {
|
public class ServerShutdownHandler extends EventHandler {
|
||||||
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
|
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
|
||||||
private final ServerName serverName;
|
protected final ServerName serverName;
|
||||||
private final MasterServices services;
|
protected final MasterServices services;
|
||||||
private final DeadServer deadServers;
|
protected final DeadServer deadServers;
|
||||||
private final boolean shouldSplitHlog; // whether to split HLog or not
|
protected final boolean shouldSplitHlog; // whether to split HLog or not
|
||||||
|
|
||||||
public ServerShutdownHandler(final Server server, final MasterServices services,
|
public ServerShutdownHandler(final Server server, final MasterServices services,
|
||||||
final DeadServer deadServers, final ServerName serverName,
|
final DeadServer deadServers, final ServerName serverName,
|
||||||
|
@ -90,69 +90,6 @@ public class ServerShutdownHandler extends EventHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Before assign the ROOT region, ensure it haven't
|
|
||||||
* been assigned by other place
|
|
||||||
* <p>
|
|
||||||
* Under some scenarios, the ROOT region can be opened twice, so it seemed online
|
|
||||||
* in two regionserver at the same time.
|
|
||||||
* If the ROOT region has been assigned, so the operation can be canceled.
|
|
||||||
* @throws InterruptedException
|
|
||||||
* @throws IOException
|
|
||||||
* @throws KeeperException
|
|
||||||
*/
|
|
||||||
private void verifyAndAssignRoot()
|
|
||||||
throws InterruptedException, IOException, KeeperException {
|
|
||||||
long timeout = this.server.getConfiguration().
|
|
||||||
getLong("hbase.catalog.verification.timeout", 1000);
|
|
||||||
if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
|
|
||||||
this.services.getAssignmentManager().assignRoot();
|
|
||||||
} else if (serverName.equals(server.getCatalogTracker().getRootLocation())) {
|
|
||||||
throw new IOException("-ROOT- is onlined on the dead server "
|
|
||||||
+ serverName);
|
|
||||||
} else {
|
|
||||||
LOG.info("Skip assigning -ROOT-, because it is online on the "
|
|
||||||
+ server.getCatalogTracker().getRootLocation());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Failed many times, shutdown processing
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void verifyAndAssignRootWithRetries() throws IOException {
|
|
||||||
int iTimes = this.server.getConfiguration().getInt(
|
|
||||||
"hbase.catalog.verification.retries", 10);
|
|
||||||
|
|
||||||
long waitTime = this.server.getConfiguration().getLong(
|
|
||||||
"hbase.catalog.verification.timeout", 1000);
|
|
||||||
|
|
||||||
int iFlag = 0;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
verifyAndAssignRoot();
|
|
||||||
break;
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
this.server.abort("In server shutdown processing, assigning root", e);
|
|
||||||
throw new IOException("Aborting", e);
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (iFlag >= iTimes) {
|
|
||||||
this.server.abort("verifyAndAssignRoot failed after" + iTimes
|
|
||||||
+ " times retries, aborting", e);
|
|
||||||
throw new IOException("Aborting", e);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(waitTime);
|
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
LOG.warn("Interrupted when is the thread sleep", e1);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException("Interrupted", e1);
|
|
||||||
}
|
|
||||||
iFlag++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True if the server we are processing was carrying <code>-ROOT-</code>
|
* @return True if the server we are processing was carrying <code>-ROOT-</code>
|
||||||
*/
|
*/
|
||||||
|
@ -188,43 +125,13 @@ public class ServerShutdownHandler extends EventHandler {
|
||||||
LOG.info("Skipping log splitting for " + serverName);
|
LOG.info("Skipping log splitting for " + serverName);
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
this.services.getExecutorService().submit(this);
|
//typecast to SSH so that we make sure that it is the SSH instance that
|
||||||
|
//gets submitted as opposed to MSSH or some other derived instance of SSH
|
||||||
|
this.services.getExecutorService().submit((ServerShutdownHandler)this);
|
||||||
this.deadServers.add(serverName);
|
this.deadServers.add(serverName);
|
||||||
throw new IOException("failed log splitting for " +
|
throw new IOException("failed log splitting for " +
|
||||||
serverName + ", will retry", ioe);
|
serverName + ", will retry", ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assign root and meta if we were carrying them.
|
|
||||||
if (isCarryingRoot()) { // -ROOT-
|
|
||||||
// Check again: region may be assigned to other where because of RIT
|
|
||||||
// timeout
|
|
||||||
if (this.services.getAssignmentManager().isCarryingRoot(serverName)) {
|
|
||||||
LOG.info("Server " + serverName
|
|
||||||
+ " was carrying ROOT. Trying to assign.");
|
|
||||||
this.services.getAssignmentManager().regionOffline(
|
|
||||||
HRegionInfo.ROOT_REGIONINFO);
|
|
||||||
verifyAndAssignRootWithRetries();
|
|
||||||
} else {
|
|
||||||
LOG.info("ROOT has been assigned to otherwhere, skip assigning.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Carrying meta?
|
|
||||||
if (isCarryingMeta()) {
|
|
||||||
// Check again: region may be assigned to other where because of RIT
|
|
||||||
// timeout
|
|
||||||
if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
|
|
||||||
LOG.info("Server " + serverName
|
|
||||||
+ " was carrying META. Trying to assign.");
|
|
||||||
this.services.getAssignmentManager().regionOffline(
|
|
||||||
HRegionInfo.FIRST_META_REGIONINFO);
|
|
||||||
this.services.getAssignmentManager().assignMeta();
|
|
||||||
} else {
|
|
||||||
LOG.info("META has been assigned to otherwhere, skip assigning.");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// We don't want worker thread in the MetaServerShutdownHandler
|
// We don't want worker thread in the MetaServerShutdownHandler
|
||||||
// executor pool to block by waiting availability of -ROOT-
|
// executor pool to block by waiting availability of -ROOT-
|
||||||
// and .META. server. Otherwise, it could run into the following issue:
|
// and .META. server. Otherwise, it could run into the following issue:
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.lang.reflect.Method;
|
||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -328,6 +329,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
RpcServer rpcServer;
|
RpcServer rpcServer;
|
||||||
|
|
||||||
private final InetSocketAddress isa;
|
private final InetSocketAddress isa;
|
||||||
|
private UncaughtExceptionHandler uncaughtExceptionHandler;
|
||||||
|
|
||||||
// Info server. Default access so can be used by unit tests. REGIONSERVER
|
// Info server. Default access so can be used by unit tests. REGIONSERVER
|
||||||
// is name of the webapp and the attribute name used stuffing this instance
|
// is name of the webapp and the attribute name used stuffing this instance
|
||||||
|
@ -357,7 +359,12 @@ public class HRegionServer implements ClientProtocol,
|
||||||
// HLog and HLog roller. log is protected rather than private to avoid
|
// HLog and HLog roller. log is protected rather than private to avoid
|
||||||
// eclipse warning when accessed by inner classes
|
// eclipse warning when accessed by inner classes
|
||||||
protected volatile HLog hlog;
|
protected volatile HLog hlog;
|
||||||
|
// The meta updates are written to a different hlog. If this
|
||||||
|
// regionserver holds meta, then this field will be non-null.
|
||||||
|
protected volatile HLog hlogForMeta;
|
||||||
|
|
||||||
LogRoller hlogRoller;
|
LogRoller hlogRoller;
|
||||||
|
LogRoller metaHLogRoller;
|
||||||
|
|
||||||
// flag set after we're done setting up server threads (used for testing)
|
// flag set after we're done setting up server threads (used for testing)
|
||||||
protected volatile boolean isOnline;
|
protected volatile boolean isOnline;
|
||||||
|
@ -518,6 +525,11 @@ public class HRegionServer implements ClientProtocol,
|
||||||
"hbase.regionserver.kerberos.principal", this.isa.getHostName());
|
"hbase.regionserver.kerberos.principal", this.isa.getHostName());
|
||||||
regionServerAccounting = new RegionServerAccounting();
|
regionServerAccounting = new RegionServerAccounting();
|
||||||
cacheConfig = new CacheConfig(conf);
|
cacheConfig = new CacheConfig(conf);
|
||||||
|
uncaughtExceptionHandler = new UncaughtExceptionHandler() {
|
||||||
|
public void uncaughtException(Thread t, Throwable e) {
|
||||||
|
abort("Uncaught exception in service thread " + t.getName(), e);
|
||||||
|
}
|
||||||
|
};
|
||||||
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -931,6 +943,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
|
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
|
||||||
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
|
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
|
||||||
if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
|
if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
|
||||||
|
if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
|
||||||
if (this.compactionChecker != null)
|
if (this.compactionChecker != null)
|
||||||
this.compactionChecker.interrupt();
|
this.compactionChecker.interrupt();
|
||||||
if (this.healthCheckChore != null) {
|
if (this.healthCheckChore != null) {
|
||||||
|
@ -1405,6 +1418,21 @@ public class HRegionServer implements ClientProtocol,
|
||||||
return instantiateHLog(rootDir, logName);
|
return instantiateHLog(rootDir, logName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private HLog getMetaWAL() throws IOException {
|
||||||
|
if (this.hlogForMeta == null) {
|
||||||
|
final String logName
|
||||||
|
= HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
|
||||||
|
|
||||||
|
Path logdir = new Path(rootDir, logName);
|
||||||
|
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
|
||||||
|
|
||||||
|
this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
|
||||||
|
rootDir, logName, this.conf, getMetaWALActionListeners(),
|
||||||
|
this.serverNameFromMasterPOV.toString());
|
||||||
|
}
|
||||||
|
return this.hlogForMeta;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by {@link #setupWALAndReplication()} creating WAL instance.
|
* Called by {@link #setupWALAndReplication()} creating WAL instance.
|
||||||
* @param rootdir
|
* @param rootdir
|
||||||
|
@ -1436,6 +1464,17 @@ public class HRegionServer implements ClientProtocol,
|
||||||
return listeners;
|
return listeners;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected List<WALActionsListener> getMetaWALActionListeners() {
|
||||||
|
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
|
||||||
|
// Log roller.
|
||||||
|
this.metaHLogRoller = new MetaLogRoller(this, this);
|
||||||
|
String n = Thread.currentThread().getName();
|
||||||
|
Threads.setDaemonThreadRunning(this.metaHLogRoller.getThread(),
|
||||||
|
n + ".META.logRoller", uncaughtExceptionHandler);
|
||||||
|
listeners.add(this.metaHLogRoller);
|
||||||
|
return listeners;
|
||||||
|
}
|
||||||
|
|
||||||
protected LogRoller getLogRoller() {
|
protected LogRoller getLogRoller() {
|
||||||
return hlogRoller;
|
return hlogRoller;
|
||||||
}
|
}
|
||||||
|
@ -1465,12 +1504,6 @@ public class HRegionServer implements ClientProtocol,
|
||||||
*/
|
*/
|
||||||
private void startServiceThreads() throws IOException {
|
private void startServiceThreads() throws IOException {
|
||||||
String n = Thread.currentThread().getName();
|
String n = Thread.currentThread().getName();
|
||||||
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
|
|
||||||
public void uncaughtException(Thread t, Throwable e) {
|
|
||||||
abort("Uncaught exception in service thread " + t.getName(), e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Start executor services
|
// Start executor services
|
||||||
this.service = new ExecutorService(getServerName().toString());
|
this.service = new ExecutorService(getServerName().toString());
|
||||||
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
|
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
|
||||||
|
@ -1486,14 +1519,15 @@ public class HRegionServer implements ClientProtocol,
|
||||||
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
||||||
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
|
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
|
||||||
|
|
||||||
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", handler);
|
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler);
|
||||||
Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
|
Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
|
||||||
handler);
|
uncaughtExceptionHandler);
|
||||||
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
|
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
|
||||||
".compactionChecker", handler);
|
".compactionChecker", uncaughtExceptionHandler);
|
||||||
if (this.healthCheckChore != null) {
|
if (this.healthCheckChore != null) {
|
||||||
Threads
|
Threads
|
||||||
.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", handler);
|
.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
|
||||||
|
uncaughtExceptionHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||||
|
@ -1574,11 +1608,31 @@ public class HRegionServer implements ClientProtocol,
|
||||||
stop("One or more threads are no longer alive -- stop");
|
stop("One or more threads are no longer alive -- stop");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
|
||||||
|
stop("Meta HLog roller thread is no longer alive -- stop");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public HLog getWAL() {
|
public HLog getWAL() {
|
||||||
|
try {
|
||||||
|
return getWAL(null);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("getWAL threw exception " + e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
|
||||||
|
//TODO: at some point this should delegate to the HLogFactory
|
||||||
|
//currently, we don't care about the region as much as we care about the
|
||||||
|
//table.. (hence checking the tablename below)
|
||||||
|
if (regionInfo != null &&
|
||||||
|
Arrays.equals(regionInfo.getTableName(), HConstants.META_TABLE_NAME)) {
|
||||||
|
return getMetaWAL();
|
||||||
|
}
|
||||||
return this.hlog;
|
return this.hlog;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1725,6 +1779,9 @@ public class HRegionServer implements ClientProtocol,
|
||||||
if (this.hlogRoller != null) {
|
if (this.hlogRoller != null) {
|
||||||
Threads.shutdown(this.hlogRoller.getThread());
|
Threads.shutdown(this.hlogRoller.getThread());
|
||||||
}
|
}
|
||||||
|
if (this.metaHLogRoller != null) {
|
||||||
|
Threads.shutdown(this.metaHLogRoller.getThread());
|
||||||
|
}
|
||||||
if (this.compactSplitThread != null) {
|
if (this.compactSplitThread != null) {
|
||||||
this.compactSplitThread.join();
|
this.compactSplitThread.join();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
@ -47,7 +48,7 @@ class LogRoller extends HasThread implements WALActionsListener {
|
||||||
private final ReentrantLock rollLock = new ReentrantLock();
|
private final ReentrantLock rollLock = new ReentrantLock();
|
||||||
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
private final AtomicBoolean rollLog = new AtomicBoolean(false);
|
||||||
private final Server server;
|
private final Server server;
|
||||||
private final RegionServerServices services;
|
protected final RegionServerServices services;
|
||||||
private volatile long lastrolltime = System.currentTimeMillis();
|
private volatile long lastrolltime = System.currentTimeMillis();
|
||||||
// Period to roll log.
|
// Period to roll log.
|
||||||
private final long rollperiod;
|
private final long rollperiod;
|
||||||
|
@ -92,7 +93,7 @@ class LogRoller extends HasThread implements WALActionsListener {
|
||||||
try {
|
try {
|
||||||
this.lastrolltime = now;
|
this.lastrolltime = now;
|
||||||
// This is array of actual region names.
|
// This is array of actual region names.
|
||||||
byte [][] regionsToFlush = this.services.getWAL().rollWriter(rollLog.get());
|
byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
|
||||||
if (regionsToFlush != null) {
|
if (regionsToFlush != null) {
|
||||||
for (byte [] r: regionsToFlush) scheduleFlush(r);
|
for (byte [] r: regionsToFlush) scheduleFlush(r);
|
||||||
}
|
}
|
||||||
|
@ -159,6 +160,10 @@ class LogRoller extends HasThread implements WALActionsListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected HLog getWAL() throws IOException {
|
||||||
|
return this.services.getWAL(null);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
|
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
|
||||||
// Not interested
|
// Not interested
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class MetaLogRoller extends LogRoller {
|
||||||
|
public MetaLogRoller(Server server, RegionServerServices services) {
|
||||||
|
super(server, services);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected HLog getWAL() throws IOException {
|
||||||
|
return services.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
@ -38,8 +39,9 @@ public interface RegionServerServices extends OnlineRegions {
|
||||||
*/
|
*/
|
||||||
public boolean isStopping();
|
public boolean isStopping();
|
||||||
|
|
||||||
/** @return the HLog */
|
/** @return the HLog for a particular region. Pass null for getting the
|
||||||
public HLog getWAL();
|
* default (common) WAL */
|
||||||
|
public HLog getWAL(HRegionInfo regionInfo) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Implementation of {@link CompactionRequestor} or null.
|
* @return Implementation of {@link CompactionRequestor} or null.
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.EventHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -44,7 +45,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
public class OpenRegionHandler extends EventHandler {
|
public class OpenRegionHandler extends EventHandler {
|
||||||
private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
|
private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
|
||||||
|
|
||||||
private final RegionServerServices rsServices;
|
protected final RegionServerServices rsServices;
|
||||||
|
|
||||||
private final HRegionInfo regionInfo;
|
private final HRegionInfo regionInfo;
|
||||||
private final HTableDescriptor htd;
|
private final HTableDescriptor htd;
|
||||||
|
@ -424,7 +425,8 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
// Instantiate the region. This also periodically tickles our zk OPENING
|
// Instantiate the region. This also periodically tickles our zk OPENING
|
||||||
// state so master doesn't timeout this region in transition.
|
// state so master doesn't timeout this region in transition.
|
||||||
region = HRegion.openHRegion(this.regionInfo, this.htd,
|
region = HRegion.openHRegion(this.regionInfo, this.htd,
|
||||||
this.rsServices.getWAL(), this.server.getConfiguration(),
|
this.rsServices.getWAL(this.regionInfo),
|
||||||
|
this.server.getConfiguration(),
|
||||||
this.rsServices,
|
this.rsServices,
|
||||||
new CancelableProgressable() {
|
new CancelableProgressable() {
|
||||||
public boolean progress() {
|
public boolean progress() {
|
||||||
|
|
|
@ -154,6 +154,8 @@ class FSHLog implements HLog, Syncable {
|
||||||
|
|
||||||
private final AtomicLong logSeqNum = new AtomicLong(0);
|
private final AtomicLong logSeqNum = new AtomicLong(0);
|
||||||
|
|
||||||
|
private boolean forMeta = false;
|
||||||
|
|
||||||
// The timestamp (in ms) when the log file was created.
|
// The timestamp (in ms) when the log file was created.
|
||||||
private volatile long filenum = -1;
|
private volatile long filenum = -1;
|
||||||
|
|
||||||
|
@ -211,15 +213,15 @@ class FSHLog implements HLog, Syncable {
|
||||||
*
|
*
|
||||||
* @param fs filesystem handle
|
* @param fs filesystem handle
|
||||||
* @param root path for stored and archived hlogs
|
* @param root path for stored and archived hlogs
|
||||||
* @param logName dir where hlogs are stored
|
* @param logDir dir where hlogs are stored
|
||||||
* @param conf configuration to use
|
* @param conf configuration to use
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public FSHLog(final FileSystem fs, final Path root, final String logName,
|
public FSHLog(final FileSystem fs, final Path root, final String logDir,
|
||||||
final Configuration conf)
|
final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
|
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
|
||||||
conf, null, true, null);
|
conf, null, true, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -227,16 +229,16 @@ class FSHLog implements HLog, Syncable {
|
||||||
*
|
*
|
||||||
* @param fs filesystem handle
|
* @param fs filesystem handle
|
||||||
* @param root path for stored and archived hlogs
|
* @param root path for stored and archived hlogs
|
||||||
* @param logName dir where hlogs are stored
|
* @param logDir dir where hlogs are stored
|
||||||
* @param oldLogName dir where hlogs are archived
|
* @param oldLogDir dir where hlogs are archived
|
||||||
* @param conf configuration to use
|
* @param conf configuration to use
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public FSHLog(final FileSystem fs, final Path root, final String logName,
|
public FSHLog(final FileSystem fs, final Path root, final String logDir,
|
||||||
final String oldLogName, final Configuration conf)
|
final String oldLogDir, final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(fs, root, logName, oldLogName,
|
this(fs, root, logDir, oldLogDir,
|
||||||
conf, null, true, null);
|
conf, null, true, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -248,7 +250,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
*
|
*
|
||||||
* @param fs filesystem handle
|
* @param fs filesystem handle
|
||||||
* @param root path for stored and archived hlogs
|
* @param root path for stored and archived hlogs
|
||||||
* @param logName dir where hlogs are stored
|
* @param logDir dir where hlogs are stored
|
||||||
* @param conf configuration to use
|
* @param conf configuration to use
|
||||||
* @param listeners Listeners on WAL events. Listeners passed here will
|
* @param listeners Listeners on WAL events. Listeners passed here will
|
||||||
* be registered before we do anything else; e.g. the
|
* be registered before we do anything else; e.g. the
|
||||||
|
@ -258,11 +260,11 @@ class FSHLog implements HLog, Syncable {
|
||||||
* If prefix is null, "hlog" will be used
|
* If prefix is null, "hlog" will be used
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public FSHLog(final FileSystem fs, final Path root, final String logName,
|
public FSHLog(final FileSystem fs, final Path root, final String logDir,
|
||||||
final Configuration conf, final List<WALActionsListener> listeners,
|
final Configuration conf, final List<WALActionsListener> listeners,
|
||||||
final String prefix) throws IOException {
|
final String prefix) throws IOException {
|
||||||
this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
|
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
|
||||||
conf, listeners, true, prefix);
|
conf, listeners, true, prefix, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -274,7 +276,8 @@ class FSHLog implements HLog, Syncable {
|
||||||
*
|
*
|
||||||
* @param fs filesystem handle
|
* @param fs filesystem handle
|
||||||
* @param root path to where logs and oldlogs
|
* @param root path to where logs and oldlogs
|
||||||
* @param oldLogName path to where hlogs are archived
|
* @param logDir dir where hlogs are stored
|
||||||
|
* @param oldLogDir dir where hlogs are archived
|
||||||
* @param conf configuration to use
|
* @param conf configuration to use
|
||||||
* @param listeners Listeners on WAL events. Listeners passed here will
|
* @param listeners Listeners on WAL events. Listeners passed here will
|
||||||
* be registered before we do anything else; e.g. the
|
* be registered before we do anything else; e.g. the
|
||||||
|
@ -283,18 +286,20 @@ class FSHLog implements HLog, Syncable {
|
||||||
* @param prefix should always be hostname and port in distributed env and
|
* @param prefix should always be hostname and port in distributed env and
|
||||||
* it will be URL encoded before being used.
|
* it will be URL encoded before being used.
|
||||||
* If prefix is null, "hlog" will be used
|
* If prefix is null, "hlog" will be used
|
||||||
|
* @param forMeta if this hlog is meant for meta updates
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private FSHLog(final FileSystem fs, final Path root, final String logName,
|
public FSHLog(final FileSystem fs, final Path root, final String logDir,
|
||||||
final String oldLogName, final Configuration conf,
|
final String oldLogDir, final Configuration conf,
|
||||||
final List<WALActionsListener> listeners,
|
final List<WALActionsListener> listeners,
|
||||||
final boolean failIfLogDirExists, final String prefix)
|
final boolean failIfLogDirExists, final String prefix, boolean forMeta)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super();
|
super();
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.rootDir = root;
|
this.rootDir = root;
|
||||||
this.dir = new Path(this.rootDir, logName);
|
this.dir = new Path(this.rootDir, logDir);
|
||||||
this.oldLogDir = new Path(this.rootDir, oldLogName);
|
this.oldLogDir = new Path(this.rootDir, oldLogDir);
|
||||||
|
this.forMeta = forMeta;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
|
||||||
if (listeners != null) {
|
if (listeners != null) {
|
||||||
|
@ -333,15 +338,16 @@ class FSHLog implements HLog, Syncable {
|
||||||
// If prefix is null||empty then just name it hlog
|
// If prefix is null||empty then just name it hlog
|
||||||
this.prefix = prefix == null || prefix.isEmpty() ?
|
this.prefix = prefix == null || prefix.isEmpty() ?
|
||||||
"hlog" : URLEncoder.encode(prefix, "UTF8");
|
"hlog" : URLEncoder.encode(prefix, "UTF8");
|
||||||
|
|
||||||
if (failIfLogDirExists && this.fs.exists(dir)) {
|
boolean dirExists = false;
|
||||||
|
if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
|
||||||
throw new IOException("Target HLog directory already exists: " + dir);
|
throw new IOException("Target HLog directory already exists: " + dir);
|
||||||
}
|
}
|
||||||
if (!fs.mkdirs(dir)) {
|
if (!dirExists && !fs.mkdirs(dir)) {
|
||||||
throw new IOException("Unable to mkdir " + dir);
|
throw new IOException("Unable to mkdir " + dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fs.exists(oldLogDir)) {
|
if (!fs.exists(this.oldLogDir)) {
|
||||||
if (!fs.mkdirs(this.oldLogDir)) {
|
if (!fs.mkdirs(this.oldLogDir)) {
|
||||||
throw new IOException("Unable to mkdir " + this.oldLogDir);
|
throw new IOException("Unable to mkdir " + this.oldLogDir);
|
||||||
}
|
}
|
||||||
|
@ -483,6 +489,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
long currentFilenum = this.filenum;
|
long currentFilenum = this.filenum;
|
||||||
Path oldPath = null;
|
Path oldPath = null;
|
||||||
if (currentFilenum > 0) {
|
if (currentFilenum > 0) {
|
||||||
|
//computeFilename will take care of meta hlog filename
|
||||||
oldPath = computeFilename(currentFilenum);
|
oldPath = computeFilename(currentFilenum);
|
||||||
}
|
}
|
||||||
this.filenum = System.currentTimeMillis();
|
this.filenum = System.currentTimeMillis();
|
||||||
|
@ -561,6 +568,9 @@ class FSHLog implements HLog, Syncable {
|
||||||
*/
|
*/
|
||||||
protected Writer createWriterInstance(final FileSystem fs, final Path path,
|
protected Writer createWriterInstance(final FileSystem fs, final Path path,
|
||||||
final Configuration conf) throws IOException {
|
final Configuration conf) throws IOException {
|
||||||
|
if (forMeta) {
|
||||||
|
//TODO: set a higher replication for the hlog files (HBASE-6773)
|
||||||
|
}
|
||||||
return HLogFactory.createWriter(fs, path, conf);
|
return HLogFactory.createWriter(fs, path, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -729,7 +739,11 @@ class FSHLog implements HLog, Syncable {
|
||||||
if (filenum < 0) {
|
if (filenum < 0) {
|
||||||
throw new RuntimeException("hlog file number can't be < 0");
|
throw new RuntimeException("hlog file number can't be < 0");
|
||||||
}
|
}
|
||||||
return new Path(dir, prefix + "." + filenum);
|
String child = prefix + "." + filenum;
|
||||||
|
if (forMeta) {
|
||||||
|
child += HLog.META_HLOG_FILE_EXTN;
|
||||||
|
}
|
||||||
|
return new Path(dir, child);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -50,6 +50,8 @@ public interface HLog {
|
||||||
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
|
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
|
||||||
public static final String SPLITTING_EXT = "-splitting";
|
public static final String SPLITTING_EXT = "-splitting";
|
||||||
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
|
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
|
||||||
|
/** The META region's HLog filename extension */
|
||||||
|
public static final String META_HLOG_FILE_EXTN = ".meta";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Name of directory that holds recovered edits written by the wal log
|
* Name of directory that holds recovered edits written by the wal log
|
||||||
|
|
|
@ -26,9 +26,9 @@ import java.util.List;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
|
||||||
|
|
||||||
|
@ -50,6 +50,13 @@ public class HLogFactory {
|
||||||
final String prefix) throws IOException {
|
final String prefix) throws IOException {
|
||||||
return new FSHLog(fs, root, logName, conf, listeners, prefix);
|
return new FSHLog(fs, root, logName, conf, listeners, prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
|
||||||
|
final Configuration conf, final List<WALActionsListener> listeners,
|
||||||
|
final String prefix) throws IOException {
|
||||||
|
return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
|
||||||
|
conf, listeners, false, prefix, true);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WAL Reader
|
* WAL Reader
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
@ -302,6 +303,11 @@ public class HLogSplitter {
|
||||||
+ ": " + logPath + ", length=" + logLength);
|
+ ": " + logPath + ", length=" + logLength);
|
||||||
Reader in = null;
|
Reader in = null;
|
||||||
try {
|
try {
|
||||||
|
//actually, for meta-only hlogs, we don't need to go thru the process
|
||||||
|
//of parsing and segregating by regions since all the logs are for
|
||||||
|
//meta only. However, there is a sequence number that can be obtained
|
||||||
|
//only by parsing.. so we parse for all files currently
|
||||||
|
//TODO: optimize this part somehow
|
||||||
in = getReader(fs, log, conf, skipErrors);
|
in = getReader(fs, log, conf, skipErrors);
|
||||||
if (in != null) {
|
if (in != null) {
|
||||||
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
|
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
|
||||||
|
|
|
@ -76,7 +76,8 @@ public class HLogUtil {
|
||||||
/**
|
/**
|
||||||
* Pattern used to validate a HLog file name
|
* Pattern used to validate a HLog file name
|
||||||
*/
|
*/
|
||||||
private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
|
private static final Pattern pattern =
|
||||||
|
Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param filename
|
* @param filename
|
||||||
|
@ -312,4 +313,11 @@ public class HLogUtil {
|
||||||
}
|
}
|
||||||
return filesSorted;
|
return filesSorted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isMetaFile(Path p) {
|
||||||
|
if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -273,12 +273,6 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public HLog getWAL() {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompactionRequestor getCompactionRequester() {
|
public CompactionRequestor getCompactionRequester() {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
|
@ -499,4 +493,10 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
|
@ -88,11 +89,6 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
return this.stopping;
|
return this.stopping;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public HLog getWAL() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RpcServer getRpcServer() {
|
public RpcServer getRpcServer() {
|
||||||
return null;
|
return null;
|
||||||
|
@ -170,4 +166,10 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
public Leases getLeases() {
|
public Leases getLeases() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue