HBASE-7213 Have HLog files for .META. and -ROOT- edits only (Devaraj Das)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1433152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-01-14 21:43:31 +00:00
parent 920153a859
commit 9575f0f0a0
17 changed files with 409 additions and 165 deletions

View File

@ -960,6 +960,7 @@ Server {
return;
}
LOG.info("Forcing splitLog and expire of " + sn);
fileSystemManager.splitMetaLog(sn);
fileSystemManager.splitLog(sn);
serverManager.expireServer(sn);
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -84,6 +85,18 @@ public class MasterFileSystem {
final SplitLogManager splitLogManager;
private final MasterServices services;
private final static PathFilter META_FILTER = new PathFilter() {
public boolean accept(Path p) {
return HLogUtil.isMetaFile(p);
}
};
private final static PathFilter NON_META_FILTER = new PathFilter() {
public boolean accept(Path p) {
return !HLogUtil.isMetaFile(p);
}
};
public MasterFileSystem(Server master, MasterServices services,
MetricsMaster metricsMaster, boolean masterRecovery)
throws IOException {
@ -229,7 +242,8 @@ public class MasterFileSystem {
+ " belongs to an existing region server");
}
}
splitLog(serverNames);
splitLog(serverNames, META_FILTER);
splitLog(serverNames, NON_META_FILTER);
retrySplitting = false;
} catch (IOException ioe) {
LOG.warn("Failed splitting of " + serverNames, ioe);
@ -258,8 +272,30 @@ public class MasterFileSystem {
splitLog(serverNames);
}
public void splitLog(final List<ServerName> serverNames) throws IOException {
/**
* Specialized method to handle the splitting for meta HLog
* @param serverName
* @throws IOException
*/
public void splitMetaLog(final ServerName serverName) throws IOException {
long splitTime = 0, splitLogSize = 0;
List<ServerName> serverNames = new ArrayList<ServerName>();
serverNames.add(serverName);
List<Path> logDirs = getLogDirs(serverNames);
if (logDirs.isEmpty()) {
LOG.info("No meta logs to split");
return;
}
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
splitLogSize = splitLogManager.splitLogDistributed(logDirs, META_FILTER);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
if (this.metricsMaster != null) {
this.metricsMaster.addSplit(splitTime, splitLogSize);
}
}
private List<Path> getLogDirs(final List<ServerName> serverNames) throws IOException {
List<Path> logDirs = new ArrayList<Path>();
for (ServerName serverName: serverNames) {
Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
@ -277,6 +313,23 @@ public class MasterFileSystem {
}
logDirs.add(splitDir);
}
return logDirs;
}
public void splitLog(final List<ServerName> serverNames) throws IOException {
splitLog(serverNames, NON_META_FILTER);
}
/**
* This method is the base split method that splits HLog files matching a filter.
* Callers should pass the appropriate filter for meta and non-meta HLogs.
* @param serverNames
* @param filter
* @throws IOException
*/
public void splitLog(final List<ServerName> serverNames, PathFilter filter) throws IOException {
long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = getLogDirs(serverNames);
if (logDirs.isEmpty()) {
LOG.info("No logs to split");
@ -286,7 +339,7 @@ public class MasterFileSystem {
if (distributedLogSplitting) {
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
splitLogSize = splitLogManager.splitLogDistributed(logDirs,filter);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
} else {
for(Path logDir: logDirs){

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.DeserializationException;
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
@ -194,7 +196,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
private FileStatus[] getFileList(List<Path> logDirs) throws IOException {
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
for (Path hLogDir : logDirs) {
this.fs = hLogDir.getFileSystem(conf);
@ -202,8 +204,7 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
continue;
}
// TODO filter filenames?
FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, null);
FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
if (logfiles == null || logfiles.length == 0) {
LOG.info(hLogDir + " is empty dir, no logs to split");
} else {
@ -228,6 +229,7 @@ public class SplitLogManager extends ZooKeeperListener {
logDirs.add(logDir);
return splitLogDistributed(logDirs);
}
/**
* The caller will block until all the log files of the given region server
* have been processed - successfully split or an error is encountered - by an
@ -239,9 +241,25 @@ public class SplitLogManager extends ZooKeeperListener {
* @return cumulative size of the logfiles split
*/
public long splitLogDistributed(final List<Path> logDirs) throws IOException {
return splitLogDistributed(logDirs, null);
}
/**
* The caller will block until all the META log files of the given region server
* have been processed - successfully split or an error is encountered - by an
* available worker region server. This method must only be called after the
* region servers have been brought online.
*
* @param logDirs List of log dirs to split
* @param filter the Path filter to select specific files for considering
* @throws IOException If there was an error while splitting any log file
* @return cumulative size of the logfiles split
*/
public long splitLogDistributed(final List<Path> logDirs, PathFilter filter)
throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Doing distributed log split in " + logDirs);
FileStatus[] logfiles = getFileList(logDirs);
FileStatus[] logfiles = getFileList(logDirs, filter);
status.setStatus("Checking directory contents...");
LOG.debug("Scheduling batch of logs to split");
SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();

View File

@ -18,11 +18,17 @@
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.zookeeper.KeeperException;
/**
* Shutdown handler for the server hosting <code>-ROOT-</code>,
@ -32,7 +38,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
public class MetaServerShutdownHandler extends ServerShutdownHandler {
private final boolean carryingRoot;
private final boolean carryingMeta;
private static final Log LOG = LogFactory.getLog(MetaServerShutdownHandler.class);
public MetaServerShutdownHandler(final Server server,
final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
@ -44,11 +50,118 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
}
@Override
public void process() throws IOException {
try {
LOG.info("Splitting META logs for " + serverName);
if (this.shouldSplitHlog) {
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
} catch (IOException ioe) {
this.services.getExecutorService().submit(this);
this.deadServers.add(serverName);
throw new IOException("failed log splitting for " +
serverName + ", will retry", ioe);
}
// Assign root and meta if we were carrying them.
if (isCarryingRoot()) { // -ROOT-
// Check again: region may be assigned to other where because of RIT
// timeout
if (this.services.getAssignmentManager().isCarryingRoot(serverName)) {
LOG.info("Server " + serverName
+ " was carrying ROOT. Trying to assign.");
this.services.getAssignmentManager().regionOffline(
HRegionInfo.ROOT_REGIONINFO);
verifyAndAssignRootWithRetries();
} else {
LOG.info("ROOT has been assigned to otherwhere, skip assigning.");
}
}
// Carrying meta?
if (isCarryingMeta()) {
// Check again: region may be assigned to other where because of RIT
// timeout
if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
LOG.info("Server " + serverName
+ " was carrying META. Trying to assign.");
this.services.getAssignmentManager().regionOffline(
HRegionInfo.FIRST_META_REGIONINFO);
this.services.getAssignmentManager().assignMeta();
} else {
LOG.info("META has been assigned to otherwhere, skip assigning.");
}
}
super.process();
}
/**
* Before assign the ROOT region, ensure it haven't
* been assigned by other place
* <p>
* Under some scenarios, the ROOT region can be opened twice, so it seemed online
* in two regionserver at the same time.
* If the ROOT region has been assigned, so the operation can be canceled.
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
*/
private void verifyAndAssignRoot()
throws InterruptedException, IOException, KeeperException {
long timeout = this.server.getConfiguration().
getLong("hbase.catalog.verification.timeout", 1000);
if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
this.services.getAssignmentManager().assignRoot();
} else if (serverName.equals(server.getCatalogTracker().getRootLocation())) {
throw new IOException("-ROOT- is onlined on the dead server "
+ serverName);
} else {
LOG.info("Skip assigning -ROOT-, because it is online on the "
+ server.getCatalogTracker().getRootLocation());
}
}
/**
* Failed many times, shutdown processing
* @throws IOException
*/
private void verifyAndAssignRootWithRetries() throws IOException {
int iTimes = this.server.getConfiguration().getInt(
"hbase.catalog.verification.retries", 10);
long waitTime = this.server.getConfiguration().getLong(
"hbase.catalog.verification.timeout", 1000);
int iFlag = 0;
while (true) {
try {
verifyAndAssignRoot();
break;
} catch (KeeperException e) {
this.server.abort("In server shutdown processing, assigning root", e);
throw new IOException("Aborting", e);
} catch (Exception e) {
if (iFlag >= iTimes) {
this.server.abort("verifyAndAssignRoot failed after" + iTimes
+ " times retries, aborting", e);
throw new IOException("Aborting", e);
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e1) {
LOG.warn("Interrupted when is the thread sleep", e1);
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e1);
}
iFlag++;
}
}
}
boolean isCarryingRoot() {
return this.carryingRoot;
}
@Override
boolean isCarryingMeta() {
return this.carryingMeta;
}

View File

@ -55,10 +55,10 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ServerShutdownHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
private final ServerName serverName;
private final MasterServices services;
private final DeadServer deadServers;
private final boolean shouldSplitHlog; // whether to split HLog or not
protected final ServerName serverName;
protected final MasterServices services;
protected final DeadServer deadServers;
protected final boolean shouldSplitHlog; // whether to split HLog or not
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
@ -90,69 +90,6 @@ public class ServerShutdownHandler extends EventHandler {
}
}
/**
* Before assign the ROOT region, ensure it haven't
* been assigned by other place
* <p>
* Under some scenarios, the ROOT region can be opened twice, so it seemed online
* in two regionserver at the same time.
* If the ROOT region has been assigned, so the operation can be canceled.
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
*/
private void verifyAndAssignRoot()
throws InterruptedException, IOException, KeeperException {
long timeout = this.server.getConfiguration().
getLong("hbase.catalog.verification.timeout", 1000);
if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
this.services.getAssignmentManager().assignRoot();
} else if (serverName.equals(server.getCatalogTracker().getRootLocation())) {
throw new IOException("-ROOT- is onlined on the dead server "
+ serverName);
} else {
LOG.info("Skip assigning -ROOT-, because it is online on the "
+ server.getCatalogTracker().getRootLocation());
}
}
/**
* Failed many times, shutdown processing
* @throws IOException
*/
private void verifyAndAssignRootWithRetries() throws IOException {
int iTimes = this.server.getConfiguration().getInt(
"hbase.catalog.verification.retries", 10);
long waitTime = this.server.getConfiguration().getLong(
"hbase.catalog.verification.timeout", 1000);
int iFlag = 0;
while (true) {
try {
verifyAndAssignRoot();
break;
} catch (KeeperException e) {
this.server.abort("In server shutdown processing, assigning root", e);
throw new IOException("Aborting", e);
} catch (Exception e) {
if (iFlag >= iTimes) {
this.server.abort("verifyAndAssignRoot failed after" + iTimes
+ " times retries, aborting", e);
throw new IOException("Aborting", e);
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e1) {
LOG.warn("Interrupted when is the thread sleep", e1);
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e1);
}
iFlag++;
}
}
}
/**
* @return True if the server we are processing was carrying <code>-ROOT-</code>
*/
@ -188,43 +125,13 @@ public class ServerShutdownHandler extends EventHandler {
LOG.info("Skipping log splitting for " + serverName);
}
} catch (IOException ioe) {
this.services.getExecutorService().submit(this);
//typecast to SSH so that we make sure that it is the SSH instance that
//gets submitted as opposed to MSSH or some other derived instance of SSH
this.services.getExecutorService().submit((ServerShutdownHandler)this);
this.deadServers.add(serverName);
throw new IOException("failed log splitting for " +
serverName + ", will retry", ioe);
}
// Assign root and meta if we were carrying them.
if (isCarryingRoot()) { // -ROOT-
// Check again: region may be assigned to other where because of RIT
// timeout
if (this.services.getAssignmentManager().isCarryingRoot(serverName)) {
LOG.info("Server " + serverName
+ " was carrying ROOT. Trying to assign.");
this.services.getAssignmentManager().regionOffline(
HRegionInfo.ROOT_REGIONINFO);
verifyAndAssignRootWithRetries();
} else {
LOG.info("ROOT has been assigned to otherwhere, skip assigning.");
}
}
// Carrying meta?
if (isCarryingMeta()) {
// Check again: region may be assigned to other where because of RIT
// timeout
if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
LOG.info("Server " + serverName
+ " was carrying META. Trying to assign.");
this.services.getAssignmentManager().regionOffline(
HRegionInfo.FIRST_META_REGIONINFO);
this.services.getAssignmentManager().assignMeta();
} else {
LOG.info("META has been assigned to otherwhere, skip assigning.");
}
}
// We don't want worker thread in the MetaServerShutdownHandler
// executor pool to block by waiting availability of -ROOT-
// and .META. server. Otherwise, it could run into the following issue:

View File

@ -29,6 +29,7 @@ import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -328,6 +329,7 @@ public class HRegionServer implements ClientProtocol,
RpcServer rpcServer;
private final InetSocketAddress isa;
private UncaughtExceptionHandler uncaughtExceptionHandler;
// Info server. Default access so can be used by unit tests. REGIONSERVER
// is name of the webapp and the attribute name used stuffing this instance
@ -357,7 +359,12 @@ public class HRegionServer implements ClientProtocol,
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected volatile HLog hlog;
// The meta updates are written to a different hlog. If this
// regionserver holds meta regions, then this field will be non-null.
protected volatile HLog hlogForMeta;
LogRoller hlogRoller;
LogRoller metaHLogRoller;
// flag set after we're done setting up server threads (used for testing)
protected volatile boolean isOnline;
@ -518,6 +525,11 @@ public class HRegionServer implements ClientProtocol,
"hbase.regionserver.kerberos.principal", this.isa.getHostName());
regionServerAccounting = new RegionServerAccounting();
cacheConfig = new CacheConfig(conf);
uncaughtExceptionHandler = new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
abort("Uncaught exception in service thread " + t.getName(), e);
}
};
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
}
@ -931,6 +943,7 @@ public class HRegionServer implements ClientProtocol,
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
if (this.compactionChecker != null)
this.compactionChecker.interrupt();
if (this.healthCheckChore != null) {
@ -1135,6 +1148,13 @@ public class HRegionServer implements ClientProtocol,
private void closeWAL(final boolean delete) {
try {
if (this.hlogForMeta != null) {
//All hlogs (meta and non-meta) are in the same directory. Don't call
//closeAndDelete here since that would delete all hlogs not just the
//meta ones. We will just 'close' the hlog for meta here, and leave
//the directory cleanup to the follow-on closeAndDelete call.
this.hlogForMeta.close();
}
if (this.hlog != null) {
if (delete) {
hlog.closeAndDelete();
@ -1405,6 +1425,21 @@ public class HRegionServer implements ClientProtocol,
return instantiateHLog(rootDir, logName);
}
private HLog getMetaWAL() throws IOException {
if (this.hlogForMeta == null) {
final String logName
= HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
Path logdir = new Path(rootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
rootDir, logName, this.conf, getMetaWALActionListeners(),
this.serverNameFromMasterPOV.toString());
}
return this.hlogForMeta;
}
/**
* Called by {@link #setupWALAndReplication()} creating WAL instance.
* @param rootdir
@ -1436,6 +1471,17 @@ public class HRegionServer implements ClientProtocol,
return listeners;
}
protected List<WALActionsListener> getMetaWALActionListeners() {
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
// Log roller.
this.metaHLogRoller = new MetaLogRoller(this, this);
String n = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this.metaHLogRoller.getThread(),
n + "MetaLogRoller", uncaughtExceptionHandler);
listeners.add(this.metaHLogRoller);
return listeners;
}
protected LogRoller getLogRoller() {
return hlogRoller;
}
@ -1465,12 +1511,6 @@ public class HRegionServer implements ClientProtocol,
*/
private void startServiceThreads() throws IOException {
String n = Thread.currentThread().getName();
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
abort("Uncaught exception in service thread " + t.getName(), e);
}
};
// Start executor services
this.service = new ExecutorService(getServerName().toString());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
@ -1486,14 +1526,16 @@ public class HRegionServer implements ClientProtocol,
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", handler);
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
handler);
uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
".compactionChecker", handler);
".compactionChecker", uncaughtExceptionHandler);
if (this.healthCheckChore != null) {
Threads
.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", handler);
.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
uncaughtExceptionHandler);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
@ -1574,11 +1616,32 @@ public class HRegionServer implements ClientProtocol,
stop("One or more threads are no longer alive -- stop");
return false;
}
if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
stop("Meta HLog roller thread is no longer alive -- stop");
return false;
}
return true;
}
@Override
public HLog getWAL() {
try {
return getWAL(null);
} catch (IOException e) {
LOG.warn("getWAL threw exception " + e);
return null;
}
}
@Override
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
//TODO: at some point this should delegate to the HLogFactory
//currently, we don't care about the region as much as we care about the
//table.. (hence checking the tablename below)
//_ROOT_ and .META. regions have separate WAL.
if (regionInfo != null &&
regionInfo.isMetaTable()) {
return getMetaWAL();
}
return this.hlog;
}
@ -1725,6 +1788,9 @@ public class HRegionServer implements ClientProtocol,
if (this.hlogRoller != null) {
Threads.shutdown(this.hlogRoller.getThread());
}
if (this.metaHLogRoller != null) {
Threads.shutdown(this.metaHLogRoller.getThread());
}
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -47,7 +48,7 @@ class LogRoller extends HasThread implements WALActionsListener {
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
private final Server server;
private final RegionServerServices services;
protected final RegionServerServices services;
private volatile long lastrolltime = System.currentTimeMillis();
// Period to roll log.
private final long rollperiod;
@ -92,7 +93,7 @@ class LogRoller extends HasThread implements WALActionsListener {
try {
this.lastrolltime = now;
// This is array of actual region names.
byte [][] regionsToFlush = this.services.getWAL().rollWriter(rollLog.get());
byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
@ -159,6 +160,10 @@ class LogRoller extends HasThread implements WALActionsListener {
}
}
protected HLog getWAL() throws IOException {
return this.services.getWAL(null);
}
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
// Not interested

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@InterfaceAudience.Private
class MetaLogRoller extends LogRoller {
public MetaLogRoller(Server server, RegionServerServices services) {
super(server, services);
}
@Override
protected HLog getWAL() throws IOException {
//The argument to getWAL below could either be HRegionInfo.FIRST_META_REGIONINFO or
//HRegionInfo.ROOT_REGIONINFO. Both these share the same WAL.
return services.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -38,8 +39,9 @@ public interface RegionServerServices extends OnlineRegions {
*/
public boolean isStopping();
/** @return the HLog */
public HLog getWAL();
/** @return the HLog for a particular region. Pass null for getting the
* default (common) WAL */
public HLog getWAL(HRegionInfo regionInfo) throws IOException;
/**
* @return Implementation of {@link CompactionRequestor} or null.

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -44,7 +45,7 @@ import org.apache.zookeeper.KeeperException;
public class OpenRegionHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
private final RegionServerServices rsServices;
protected final RegionServerServices rsServices;
private final HRegionInfo regionInfo;
private final HTableDescriptor htd;
@ -424,7 +425,8 @@ public class OpenRegionHandler extends EventHandler {
// Instantiate the region. This also periodically tickles our zk OPENING
// state so master doesn't timeout this region in transition.
region = HRegion.openHRegion(this.regionInfo, this.htd,
this.rsServices.getWAL(), this.server.getConfiguration(),
this.rsServices.getWAL(this.regionInfo),
this.server.getConfiguration(),
this.rsServices,
new CancelableProgressable() {
public boolean progress() {

View File

@ -154,6 +154,8 @@ class FSHLog implements HLog, Syncable {
private final AtomicLong logSeqNum = new AtomicLong(0);
private boolean forMeta = false;
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
@ -211,15 +213,15 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path for stored and archived hlogs
* @param logName dir where hlogs are stored
* @param logDir dir where hlogs are stored
* @param conf configuration to use
* @throws IOException
*/
public FSHLog(final FileSystem fs, final Path root, final String logName,
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf)
throws IOException {
this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
conf, null, true, null);
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
conf, null, true, null, false);
}
/**
@ -227,16 +229,16 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path for stored and archived hlogs
* @param logName dir where hlogs are stored
* @param oldLogName dir where hlogs are archived
* @param logDir dir where hlogs are stored
* @param oldLogDir dir where hlogs are archived
* @param conf configuration to use
* @throws IOException
*/
public FSHLog(final FileSystem fs, final Path root, final String logName,
final String oldLogName, final Configuration conf)
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final String oldLogDir, final Configuration conf)
throws IOException {
this(fs, root, logName, oldLogName,
conf, null, true, null);
this(fs, root, logDir, oldLogDir,
conf, null, true, null, false);
}
/**
@ -248,7 +250,7 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path for stored and archived hlogs
* @param logName dir where hlogs are stored
* @param logDir dir where hlogs are stored
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
@ -258,11 +260,11 @@ class FSHLog implements HLog, Syncable {
* If prefix is null, "hlog" will be used
* @throws IOException
*/
public FSHLog(final FileSystem fs, final Path root, final String logName,
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, true, prefix);
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, true, prefix, false);
}
/**
@ -274,7 +276,8 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path to where logs and oldlogs
* @param oldLogName path to where hlogs are archived
* @param logDir dir where hlogs are stored
* @param oldLogDir dir where hlogs are archived
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
@ -283,18 +286,20 @@ class FSHLog implements HLog, Syncable {
* @param prefix should always be hostname and port in distributed env and
* it will be URL encoded before being used.
* If prefix is null, "hlog" will be used
* @param forMeta if this hlog is meant for meta updates
* @throws IOException
*/
private FSHLog(final FileSystem fs, final Path root, final String logName,
final String oldLogName, final Configuration conf,
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final String oldLogDir, final Configuration conf,
final List<WALActionsListener> listeners,
final boolean failIfLogDirExists, final String prefix)
final boolean failIfLogDirExists, final String prefix, boolean forMeta)
throws IOException {
super();
this.fs = fs;
this.rootDir = root;
this.dir = new Path(this.rootDir, logName);
this.oldLogDir = new Path(this.rootDir, oldLogName);
this.dir = new Path(this.rootDir, logDir);
this.oldLogDir = new Path(this.rootDir, oldLogDir);
this.forMeta = forMeta;
this.conf = conf;
if (listeners != null) {
@ -333,15 +338,16 @@ class FSHLog implements HLog, Syncable {
// If prefix is null||empty then just name it hlog
this.prefix = prefix == null || prefix.isEmpty() ?
"hlog" : URLEncoder.encode(prefix, "UTF8");
if (failIfLogDirExists && this.fs.exists(dir)) {
boolean dirExists = false;
if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
throw new IOException("Target HLog directory already exists: " + dir);
}
if (!fs.mkdirs(dir)) {
if (!dirExists && !fs.mkdirs(dir)) {
throw new IOException("Unable to mkdir " + dir);
}
if (!fs.exists(oldLogDir)) {
if (!fs.exists(this.oldLogDir)) {
if (!fs.mkdirs(this.oldLogDir)) {
throw new IOException("Unable to mkdir " + this.oldLogDir);
}
@ -483,6 +489,7 @@ class FSHLog implements HLog, Syncable {
long currentFilenum = this.filenum;
Path oldPath = null;
if (currentFilenum > 0) {
//computeFilename will take care of meta hlog filename
oldPath = computeFilename(currentFilenum);
}
this.filenum = System.currentTimeMillis();
@ -561,6 +568,9 @@ class FSHLog implements HLog, Syncable {
*/
protected Writer createWriterInstance(final FileSystem fs, final Path path,
final Configuration conf) throws IOException {
if (forMeta) {
//TODO: set a higher replication for the hlog files (HBASE-6773)
}
return HLogFactory.createWriter(fs, path, conf);
}
@ -729,7 +739,11 @@ class FSHLog implements HLog, Syncable {
if (filenum < 0) {
throw new RuntimeException("hlog file number can't be < 0");
}
return new Path(dir, prefix + "." + filenum);
String child = prefix + "." + filenum;
if (forMeta) {
child += HLog.META_HLOG_FILE_EXTN;
}
return new Path(dir, child);
}
@Override

View File

@ -50,6 +50,8 @@ public interface HLog {
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
public static final String SPLITTING_EXT = "-splitting";
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
/** The META region's HLog filename extension */
public static final String META_HLOG_FILE_EXTN = ".meta";
/*
* Name of directory that holds recovered edits written by the wal log

View File

@ -26,9 +26,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
@ -50,6 +50,13 @@ public class HLogFactory {
final String prefix) throws IOException {
return new FSHLog(fs, root, logName, conf, listeners, prefix);
}
public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, false, prefix, true);
}
/*
* WAL Reader

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@ -302,6 +303,11 @@ public class HLogSplitter {
+ ": " + logPath + ", length=" + logLength);
Reader in = null;
try {
//actually, for meta-only hlogs, we don't need to go thru the process
//of parsing and segregating by regions since all the logs are for
//meta only. However, there is a sequence number that can be obtained
//only by parsing.. so we parse for all files currently
//TODO: optimize this part somehow
in = getReader(fs, log, conf, skipErrors);
if (in != null) {
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);

View File

@ -76,7 +76,8 @@ public class HLogUtil {
/**
* Pattern used to validate a HLog file name
*/
private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
private static final Pattern pattern =
Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
/**
* @param filename
@ -312,4 +313,11 @@ public class HLogUtil {
}
return filesSorted;
}
public static boolean isMetaFile(Path p) {
if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
return true;
}
return false;
}
}

View File

@ -273,12 +273,6 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
return false;
}
@Override
public HLog getWAL() {
// TODO Auto-generated method stub
return null;
}
@Override
public CompactionRequestor getCompactionRequester() {
// TODO Auto-generated method stub
@ -499,4 +493,10 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
// TODO Auto-generated method stub
return null;
}
@Override
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.fs.HFileSystem;
@ -88,11 +89,6 @@ public class MockRegionServerServices implements RegionServerServices {
return this.stopping;
}
@Override
public HLog getWAL() {
return null;
}
@Override
public RpcServer getRpcServer() {
return null;
@ -170,4 +166,10 @@ public class MockRegionServerServices implements RegionServerServices {
public Leases getLeases() {
return null;
}
}
@Override
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
// TODO Auto-generated method stub
return null;
}
}