HBASE-15763 Isolate Wal related stuff from MasterFileSystem

This commit is contained in:
Matteo Bertozzi 2016-05-04 07:59:44 -07:00
parent 0e37063345
commit a8a2c516a0
12 changed files with 533 additions and 397 deletions

View File

@ -270,6 +270,7 @@ public class HMaster extends HRegionServer implements MasterServices {
final MetricsMaster metricsMaster;
// file system manager for the master FS operations
private MasterFileSystem fileSystemManager;
private MasterWalManager walManager;
// server manager to deal with region server info
volatile ServerManager serverManager;
@ -656,7 +657,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
this.fileSystemManager = new MasterFileSystem(this, this);
this.fileSystemManager = new MasterFileSystem(this);
this.walManager = new MasterWalManager(this);
// enable table descriptors cache
this.tableDescriptors.setCacheOn();
@ -715,7 +717,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// we recover hbase:meta region servers inside master initialization and
// handle other failed servers in SSH in order to start up master node ASAP
Set<ServerName> previouslyFailedServers =
this.fileSystemManager.getFailedServersFromLogFolders();
this.walManager.getFailedServersFromLogFolders();
// log splitting for hbase:meta server
ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
@ -946,11 +948,11 @@ public class HMaster extends HRegionServer implements MasterServices {
// TODO: should we prevent from using state manager before meta was initialized?
// tableStateManager.start();
if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
if ((RecoveryMode.LOG_REPLAY == this.getMasterWalManager().getLogRecoveryMode())
&& (!previouslyFailedMetaRSs.isEmpty())) {
// replay WAL edits mode need new hbase:meta RS is assigned firstly
status.setStatus("replaying log for Meta Region");
this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
this.walManager.splitMetaLog(previouslyFailedMetaRSs);
}
this.assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
@ -985,14 +987,14 @@ public class HMaster extends HRegionServer implements MasterServices {
}
private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
if (RecoveryMode.LOG_REPLAY == this.getMasterWalManager().getLogRecoveryMode()) {
// In log replay mode, we mark hbase:meta region as recovering in ZK
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
this.walManager.prepareLogReplay(currentMetaServer, regions);
} else {
// In recovered.edits mode: create recovered edits file for hbase:meta server
this.fileSystemManager.splitMetaLog(currentMetaServer);
this.walManager.splitMetaLog(currentMetaServer);
}
}
@ -1045,6 +1047,11 @@ public class HMaster extends HRegionServer implements MasterServices {
return this.fileSystemManager;
}
@Override
public MasterWalManager getMasterWalManager() {
return this.walManager;
}
@Override
public TableStateManager getTableStateManager() {
return tableStateManager;
@ -1082,8 +1089,8 @@ public class HMaster extends HRegionServer implements MasterServices {
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
this.logCleaner =
new LogCleaner(cleanerInterval,
this, conf, getMasterFileSystem().getFileSystem(),
getMasterFileSystem().getOldLogDir());
this, conf, getMasterWalManager().getFileSystem(),
getMasterWalManager().getOldLogDir());
getChoreService().scheduleChore(logCleaner);
//start the hfile archive cleaner thread
@ -1132,6 +1139,7 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
if (this.walManager != null) this.walManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
}

View File

@ -19,99 +19,56 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
/**
* This class abstracts a bunch of operations the HMaster needs to interact with
* the underlying file system, including splitting log files, checking file
* the underlying file system like creating the initial layout, checking file
* system status, etc.
*/
@InterfaceAudience.Private
public class MasterFileSystem {
private static final Log LOG = LogFactory.getLog(MasterFileSystem.class.getName());
private static final Log LOG = LogFactory.getLog(MasterFileSystem.class);
// HBase configuration
Configuration conf;
// master status
Server master;
// metrics for master
private final MetricsMasterFileSystem metricsMasterFilesystem = new MetricsMasterFileSystem();
private final Configuration conf;
// Persisted unique cluster ID
private ClusterId clusterId;
// Keep around for convenience.
private final FileSystem fs;
// Is the fileystem ok?
private volatile boolean fsOk = true;
// The Path to the old logs dir
private final Path oldLogDir;
// root hbase directory on the FS
private final Path rootdir;
// hbase temp directory used for table construction and deletion
private final Path tempdir;
// create the split log lock
final Lock splitLogLock = new ReentrantLock();
final boolean distributedLogReplay;
final SplitLogManager splitLogManager;
private final MasterServices services;
final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return DefaultWALProvider.isMetaFile(p);
}
};
final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return !DefaultWALProvider.isMetaFile(p);
}
};
public MasterFileSystem(Server master, MasterServices services)
throws IOException {
this.conf = master.getConfiguration();
this.master = master;
public MasterFileSystem(MasterServices services) throws IOException {
this.conf = services.getConfiguration();
this.services = services;
// Set filesystem to be that of this.rootdir else we get complaints about
// mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
@ -126,18 +83,8 @@ public class MasterFileSystem {
// make sure the fs has the same conf
fs.setConf(conf);
// setup the filesystem variable
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
createInitialFileSystemLayout();
HFileSystem.addLocationsOrderInterceptor(conf);
this.splitLogManager =
new SplitLogManager(master, master.getConfiguration(), master, services,
master.getServerName());
this.distributedLogReplay = this.splitLogManager.isLogReplaying();
}
@VisibleForTesting
SplitLogManager getSplitLogManager() {
return this.splitLogManager;
}
/**
@ -146,55 +93,23 @@ public class MasterFileSystem {
* <li>Check if the meta region exists and is readable, if not create it.
* Create hbase.version and the hbase:meta directory if not one.
* </li>
* <li>Create a log archive directory for RS to put archived logs</li>
* </ol>
* Idempotent.
*/
private Path createInitialFileSystemLayout() throws IOException {
private void createInitialFileSystemLayout() throws IOException {
// check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs);
// check if temp directory exists and clean it
checkTempDir(this.tempdir, conf, this.fs);
Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
// Make sure the region servers can archive their old logs
if(!this.fs.exists(oldLogDir)) {
this.fs.mkdirs(oldLogDir);
}
return oldLogDir;
}
public FileSystem getFileSystem() {
return this.fs;
}
/**
* Get the directory where old logs go
* @return the dir
*/
public Path getOldLogDir() {
return this.oldLogDir;
}
/**
* Checks to see if the file system is still accessible.
* If not, sets closed
* @return false if file system is not available
*/
public boolean checkFileSystem() {
if (this.fsOk) {
try {
FSUtils.checkFileSystemAvailable(this.fs);
FSUtils.checkDfsSafeMode(this.conf);
} catch (IOException e) {
master.abort("Shutting down HBase cluster: file system not available", e);
this.fsOk = false;
}
}
return this.fsOk;
public Configuration getConfiguration() {
return this.conf;
}
/**
@ -218,197 +133,6 @@ public class MasterFileSystem {
return clusterId;
}
/**
* Inspect the log directory to find dead servers which need recovery work
* @return A set of ServerNames which aren't running but still have WAL files left in file system
*/
Set<ServerName> getFailedServersFromLogFolders() {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
Set<ServerName> serverNames = new HashSet<ServerName>();
Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
do {
if (master.isStopped()) {
LOG.warn("Master stopped while trying to get failed servers.");
break;
}
try {
if (!this.fs.exists(logsDirPath)) return serverNames;
FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
// Get online servers after getting log folders to avoid log folder deletion of newly
// checked in region servers . see HBASE-5916
Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers()
.keySet();
if (logFolders == null || logFolders.length == 0) {
LOG.debug("No log files to split, proceeding...");
return serverNames;
}
for (FileStatus status : logFolders) {
FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null);
if (curLogFiles == null || curLogFiles.length == 0) {
// Empty log folder. No recovery needed
continue;
}
final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
status.getPath());
if (null == serverName) {
LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
"region server name; leaving in place. If you see later errors about missing " +
"write ahead logs they may be saved in this location.");
} else if (!onlineServers.contains(serverName)) {
LOG.info("Log folder " + status.getPath() + " doesn't belong "
+ "to a known region server, splitting");
serverNames.add(serverName);
} else {
LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
}
}
retrySplitting = false;
} catch (IOException ioe) {
LOG.warn("Failed getting failed servers to be recovered.", ioe);
if (!checkFileSystem()) {
LOG.warn("Bad Filesystem, exiting");
Runtime.getRuntime().halt(1);
}
try {
if (retrySplitting) {
Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
}
} catch (InterruptedException e) {
LOG.warn("Interrupted, aborting since cannot return w/o splitting");
Thread.currentThread().interrupt();
retrySplitting = false;
Runtime.getRuntime().halt(1);
}
}
} while (retrySplitting);
return serverNames;
}
public void splitLog(final ServerName serverName) throws IOException {
Set<ServerName> serverNames = new HashSet<ServerName>();
serverNames.add(serverName);
splitLog(serverNames);
}
/**
* Specialized method to handle the splitting for meta WAL
* @param serverName
* @throws IOException
*/
public void splitMetaLog(final ServerName serverName) throws IOException {
Set<ServerName> serverNames = new HashSet<ServerName>();
serverNames.add(serverName);
splitMetaLog(serverNames);
}
/**
* Specialized method to handle the splitting for meta WAL
* @param serverNames
* @throws IOException
*/
public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
splitLog(serverNames, META_FILTER);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
"We only release this lock when we set it. Updates to code that uses it should verify use " +
"of the guard boolean.")
private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
List<Path> logDirs = new ArrayList<Path>();
boolean needReleaseLock = false;
if (!this.services.isInitialized()) {
// during master initialization, we could have multiple places splitting a same wal
this.splitLogLock.lock();
needReleaseLock = true;
}
try {
for (ServerName serverName : serverNames) {
Path logDir = new Path(this.rootdir,
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
// Rename the directory so a rogue RS doesn't create more WALs
if (fs.exists(logDir)) {
if (!this.fs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
}
logDir = splitDir;
LOG.debug("Renamed region directory: " + splitDir);
} else if (!fs.exists(splitDir)) {
LOG.info("Log dir for server " + serverName + " does not exist");
continue;
}
logDirs.add(splitDir);
}
} finally {
if (needReleaseLock) {
this.splitLogLock.unlock();
}
}
return logDirs;
}
/**
* Mark regions in recovering state when distributedLogReplay are set true
* @param serverName Failed region server whose wals to be replayed
* @param regions Set of regions to be recovered
* @throws IOException
*/
public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
if (!this.distributedLogReplay) {
return;
}
// mark regions in recovering state
if (regions == null || regions.isEmpty()) {
return;
}
this.splitLogManager.markRegionsRecovering(serverName, regions);
}
public void splitLog(final Set<ServerName> serverNames) throws IOException {
splitLog(serverNames, NON_META_FILTER);
}
/**
* Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
* @param failedServers
* @throws IOException
*/
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
throws IOException, InterruptedIOException {
this.splitLogManager.removeStaleRecoveringRegions(failedServers);
}
/**
* This method is the base split method that splits WAL files matching a filter. Callers should
* pass the appropriate filter for meta and non-meta WALs.
* @param serverNames logs belonging to these servers will be split; this will rename the log
* directory out from under a soft-failed server
* @param filter
* @throws IOException
*/
public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = getLogDirs(serverNames);
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTime();
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
if (this.metricsMasterFilesystem != null) {
if (filter == META_FILTER) {
this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
} else {
this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
}
}
}
/**
* Get the rootdir. Make sure its wholesome and exists before returning.
* @param rd
@ -418,9 +142,8 @@ public class MasterFileSystem {
* needed populating the directory with necessary bootup files).
* @throws IOException
*/
private Path checkRootDir(final Path rd, final Configuration c,
final FileSystem fs)
throws IOException {
private Path checkRootDir(final Path rd, final Configuration c, final FileSystem fs)
throws IOException {
// If FS is in safe mode wait till out of it.
FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
@ -595,22 +318,6 @@ public class MasterFileSystem {
}
public void stop() {
if (splitLogManager != null) {
this.splitLogManager.stop();
}
}
/**
* The function is used in SSH to set recovery mode based on configuration after all outstanding
* log split tasks drained.
* @throws IOException
*/
public void setLogRecoveryMode() throws IOException {
this.splitLogManager.setRecoveryMode(false);
}
public RecoveryMode getLogRecoveryMode() {
return this.splitLogManager.getRecoveryMode();
}
public void logFileSystemState(Log log) throws IOException {

View File

@ -72,6 +72,11 @@ public interface MasterServices extends Server {
*/
MasterFileSystem getMasterFileSystem();
/**
* @return Master's WALs {@link MasterWalManager} utility class.
*/
MasterWalManager getMasterWalManager();
/**
* @return Master's {@link ServerManager} instance.
*/

View File

@ -0,0 +1,351 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter;
/**
* This class abstracts a bunch of operations the HMaster needs
* when splitting log files e.g. finding log files, dirs etc.
*/
@InterfaceAudience.Private
public class MasterWalManager {
private static final Log LOG = LogFactory.getLog(MasterWalManager.class);
final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return DefaultWALProvider.isMetaFile(p);
}
};
final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return !DefaultWALProvider.isMetaFile(p);
}
};
// metrics for master
// TODO: Rename it, since those metrics are split-manager related
private final MetricsMasterFileSystem metricsMasterFilesystem = new MetricsMasterFileSystem();
// Keep around for convenience.
private final MasterServices services;
private final Configuration conf;
private final FileSystem fs;
// The Path to the old logs dir
private final Path oldLogDir;
private final Path rootDir;
// create the split log lock
private final Lock splitLogLock = new ReentrantLock();
private final SplitLogManager splitLogManager;
private final boolean distributedLogReplay;
// Is the fileystem ok?
private volatile boolean fsOk = true;
public MasterWalManager(MasterServices services) throws IOException {
this(services.getConfiguration(), services.getMasterFileSystem().getFileSystem(),
services.getMasterFileSystem().getRootDir(), services);
}
public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, MasterServices services)
throws IOException {
this.fs = fs;
this.conf = conf;
this.rootDir = rootDir;
this.services = services;
this.splitLogManager = new SplitLogManager(services, conf,
services, services, services.getServerName());
this.distributedLogReplay = this.splitLogManager.isLogReplaying();
this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
// Make sure the region servers can archive their old logs
if (!this.fs.exists(oldLogDir)) {
this.fs.mkdirs(oldLogDir);
}
}
public void stop() {
if (splitLogManager != null) {
splitLogManager.stop();
}
}
@VisibleForTesting
SplitLogManager getSplitLogManager() {
return this.splitLogManager;
}
/**
* Get the directory where old logs go
* @return the dir
*/
Path getOldLogDir() {
return this.oldLogDir;
}
public FileSystem getFileSystem() {
return this.fs;
}
/**
* Checks to see if the file system is still accessible.
* If not, sets closed
* @return false if file system is not available
*/
private boolean checkFileSystem() {
if (this.fsOk) {
try {
FSUtils.checkFileSystemAvailable(this.fs);
FSUtils.checkDfsSafeMode(this.conf);
} catch (IOException e) {
services.abort("Shutting down HBase cluster: file system not available", e);
this.fsOk = false;
}
}
return this.fsOk;
}
/**
* Inspect the log directory to find dead servers which need recovery work
* @return A set of ServerNames which aren't running but still have WAL files left in file system
*/
Set<ServerName> getFailedServersFromLogFolders() {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
Set<ServerName> serverNames = new HashSet<ServerName>();
Path logsDirPath = new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME);
do {
if (services.isStopped()) {
LOG.warn("Master stopped while trying to get failed servers.");
break;
}
try {
if (!this.fs.exists(logsDirPath)) return serverNames;
FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
// Get online servers after getting log folders to avoid log folder deletion of newly
// checked in region servers . see HBASE-5916
Set<ServerName> onlineServers = services.getServerManager().getOnlineServers().keySet();
if (logFolders == null || logFolders.length == 0) {
LOG.debug("No log files to split, proceeding...");
return serverNames;
}
for (FileStatus status : logFolders) {
FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null);
if (curLogFiles == null || curLogFiles.length == 0) {
// Empty log folder. No recovery needed
continue;
}
final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
status.getPath());
if (null == serverName) {
LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
"region server name; leaving in place. If you see later errors about missing " +
"write ahead logs they may be saved in this location.");
} else if (!onlineServers.contains(serverName)) {
LOG.info("Log folder " + status.getPath() + " doesn't belong "
+ "to a known region server, splitting");
serverNames.add(serverName);
} else {
LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
}
}
retrySplitting = false;
} catch (IOException ioe) {
LOG.warn("Failed getting failed servers to be recovered.", ioe);
if (!checkFileSystem()) {
LOG.warn("Bad Filesystem, exiting");
Runtime.getRuntime().halt(1);
}
try {
if (retrySplitting) {
Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
}
} catch (InterruptedException e) {
LOG.warn("Interrupted, aborting since cannot return w/o splitting");
Thread.currentThread().interrupt();
retrySplitting = false;
Runtime.getRuntime().halt(1);
}
}
} while (retrySplitting);
return serverNames;
}
public void splitLog(final ServerName serverName) throws IOException {
Set<ServerName> serverNames = new HashSet<ServerName>();
serverNames.add(serverName);
splitLog(serverNames);
}
/**
* Specialized method to handle the splitting for meta WAL
* @param serverName logs belonging to this server will be split
*/
public void splitMetaLog(final ServerName serverName) throws IOException {
Set<ServerName> serverNames = new HashSet<ServerName>();
serverNames.add(serverName);
splitMetaLog(serverNames);
}
/**
* Specialized method to handle the splitting for meta WAL
* @param serverNames logs belonging to these servers will be split
*/
public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
splitLog(serverNames, META_FILTER);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
"We only release this lock when we set it. Updates to code that uses it should verify use " +
"of the guard boolean.")
private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
List<Path> logDirs = new ArrayList<Path>();
boolean needReleaseLock = false;
if (!this.services.isInitialized()) {
// during master initialization, we could have multiple places splitting a same wal
this.splitLogLock.lock();
needReleaseLock = true;
}
try {
for (ServerName serverName : serverNames) {
Path logDir = new Path(this.rootDir,
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
// Rename the directory so a rogue RS doesn't create more WALs
if (fs.exists(logDir)) {
if (!this.fs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
}
logDir = splitDir;
LOG.debug("Renamed region directory: " + splitDir);
} else if (!fs.exists(splitDir)) {
LOG.info("Log dir for server " + serverName + " does not exist");
continue;
}
logDirs.add(splitDir);
}
} finally {
if (needReleaseLock) {
this.splitLogLock.unlock();
}
}
return logDirs;
}
/**
* Mark regions in recovering state when distributedLogReplay are set true
* @param serverName Failed region server whose wals to be replayed
* @param regions Set of regions to be recovered
*/
public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
if (!this.distributedLogReplay) {
return;
}
// mark regions in recovering state
if (regions == null || regions.isEmpty()) {
return;
}
this.splitLogManager.markRegionsRecovering(serverName, regions);
}
public void splitLog(final Set<ServerName> serverNames) throws IOException {
splitLog(serverNames, NON_META_FILTER);
}
/**
* Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
* @param failedServers A set of known failed servers
*/
void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
throws IOException, InterruptedIOException {
this.splitLogManager.removeStaleRecoveringRegions(failedServers);
}
/**
* This method is the base split method that splits WAL files matching a filter. Callers should
* pass the appropriate filter for meta and non-meta WALs.
* @param serverNames logs belonging to these servers will be split; this will rename the log
* directory out from under a soft-failed server
*/
public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = getLogDirs(serverNames);
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTime();
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
if (this.metricsMasterFilesystem != null) {
if (filter == META_FILTER) {
this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
} else {
this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
}
}
}
/**
* The function is used in SSH to set recovery mode based on configuration after all outstanding
* log split tasks drained.
*/
public void setLogRecoveryMode() throws IOException {
this.splitLogManager.setRecoveryMode(false);
}
public RecoveryMode getLogRecoveryMode() {
return this.splitLogManager.getRecoveryMode();
}
}

View File

@ -753,7 +753,7 @@ public class ServerManager {
}
OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
region, favoredNodes,
(RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
(RecoveryMode.LOG_REPLAY == this.services.getMasterWalManager().getLogRecoveryMode()));
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningState(response);
@ -781,7 +781,7 @@ public class ServerManager {
}
OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
(RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
(RecoveryMode.LOG_REPLAY == this.services.getMasterWalManager().getLogRecoveryMode()));
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningStateList(response);

View File

@ -75,8 +75,8 @@ import com.google.common.annotations.VisibleForTesting;
* <p>SplitLogManager monitors the tasks that it creates using the
* timeoutMonitor thread. If a task's progress is slow then
* {@link SplitLogManagerCoordination#checkTasks} will take away the
* task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
* and the task will be up for grabs again. When the task is done then it is
* task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
* and the task will be up for grabs again. When the task is done then it is
* deleted by SplitLogManager.
*
* <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
@ -273,7 +273,7 @@ public class SplitLogManager {
}
waitForSplittingCompletion(batch, status);
// remove recovering regions
if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
if (filter == MasterWalManager.META_FILTER /* reference comparison */) {
// we split meta regions and user regions separately therefore logfiles are either all for
// meta or user regions but won't for both( we could have mixed situations in tests)
isMetaRecovery = true;
@ -411,7 +411,7 @@ public class SplitLogManager {
for (ServerName tmpServerName : serverNames) {
recoveredServerNameSet.add(tmpServerName.getServerName());
}
this.recoveringRegionLock.lock();
try {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())

View File

@ -37,8 +37,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
@ -320,10 +320,10 @@ implements ServerProcedureInterface {
* @throws IOException
*/
private void start(final MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
// Set recovery mode late. This is what the old ServerShutdownHandler used do.
mfs.setLogRecoveryMode();
this.distributedLogReplay = mfs.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY;
mwm.setLogRecoveryMode();
this.distributedLogReplay = mwm.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY;
}
/**
@ -335,7 +335,7 @@ implements ServerProcedureInterface {
private boolean processMeta(final MasterProcedureEnv env)
throws IOException {
if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName);
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
if (this.shouldSplitWal) {
@ -343,7 +343,7 @@ implements ServerProcedureInterface {
prepareLogReplay(env, META_REGION_SET);
} else {
// TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
mfs.splitMetaLog(serverName);
mwm.splitMetaLog(serverName);
am.getRegionStates().logSplit(metaHRI);
}
}
@ -360,7 +360,7 @@ implements ServerProcedureInterface {
processed = false;
} else {
// TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
mfs.splitMetaLog(serverName);
mwm.splitMetaLog(serverName);
}
}
}
@ -394,9 +394,9 @@ implements ServerProcedureInterface {
LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " regions-in-recovery from " +
this.serverName);
}
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
mfs.prepareLogReplay(this.serverName, regions);
mwm.prepareLogReplay(this.serverName, regions);
am.getRegionStates().logSplit(this.serverName);
}
@ -405,10 +405,10 @@ implements ServerProcedureInterface {
LOG.debug("Splitting logs from " + serverName + "; region count=" +
size(this.regionsOnCrashedServer));
}
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
// TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
mfs.splitLog(this.serverName);
mwm.splitLog(this.serverName);
am.getRegionStates().logSplit(this.serverName);
}

View File

@ -97,6 +97,11 @@ public class MockNoopMasterServices implements MasterServices, Server {
return null;
}
@Override
public MasterWalManager getMasterWalManager() {
return null;
}
@Override
public MasterCoprocessorHost getMasterCoprocessorHost() {
return null;

View File

@ -221,9 +221,11 @@ public class TestCatalogJanitor {
class MockMasterServices extends MockNoopMasterServices {
private final MasterFileSystem mfs;
private final AssignmentManager asm;
private final Server server;
MockMasterServices(final Server server) throws IOException {
this.mfs = new MasterFileSystem(server, this);
this.server = server;
this.mfs = new MasterFileSystem(this);
this.asm = Mockito.mock(AssignmentManager.class);
}
@ -239,7 +241,12 @@ public class TestCatalogJanitor {
@Override
public Configuration getConfiguration() {
return mfs.conf;
return server.getConfiguration();
}
@Override
public ServerName getServerName() {
return server.getServerName();
}
@Override

View File

@ -1,6 +1,5 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -213,7 +212,7 @@ public class TestDistributedLogSplitting {
startCluster(NUM_RS);
final int NUM_LOG_LINES = 1000;
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
// turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits
master.balanceSwitch(false);
@ -680,7 +679,7 @@ public class TestDistributedLogSplitting {
final ZooKeeperWatcher zkw = master.getZooKeeper();
Table ht = installTable(zkw, "table", "family", 40);
try {
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
HRegionInfo region = null;
@ -929,7 +928,7 @@ public class TestDistributedLogSplitting {
final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
try {
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
HRegionInfo region = null;
@ -1004,7 +1003,7 @@ public class TestDistributedLogSplitting {
LOG.info("testWorkerAbort");
startCluster(3);
final int NUM_LOG_LINES = 10000;
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
FileSystem fs = master.getMasterFileSystem().getFileSystem();
final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
@ -1123,7 +1122,7 @@ public class TestDistributedLogSplitting {
public void testDelayedDeleteOnFailure() throws Exception {
LOG.info("testDelayedDeleteOnFailure");
startCluster(1);
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
fs.mkdirs(logDir);
@ -1204,10 +1203,10 @@ public class TestDistributedLogSplitting {
LOG.info("#regions = " + regions.size());
Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), tmpRegions);
Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
userRegionSet.addAll(regions);
master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), userRegionSet);
boolean isMetaRegionInRecovery = false;
List<String> recoveringRegions =
zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
@ -1219,7 +1218,7 @@ public class TestDistributedLogSplitting {
}
assertTrue(isMetaRegionInRecovery);
master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
master.getMasterWalManager().splitMetaLog(hrs.getServerName());
isMetaRegionInRecovery = false;
recoveringRegions =

View File

@ -21,25 +21,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -50,8 +40,8 @@ import org.junit.experimental.categories.Category;
*/
@Category({MasterTests.class, MediumTests.class})
public class TestMasterFileSystem {
private static final Log LOG = LogFactory.getLog(TestMasterFileSystem.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@BeforeClass
@ -68,52 +58,12 @@ public class TestMasterFileSystem {
public void testFsUriSetProperly() throws Exception {
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MasterFileSystem fs = master.getMasterFileSystem();
Path masterRoot = FSUtils.getRootDir(fs.conf);
Path masterRoot = FSUtils.getRootDir(fs.getConfiguration());
Path rootDir = FSUtils.getRootDir(fs.getFileSystem().getConf());
// make sure the fs and the found root dir have the same scheme
LOG.debug("from fs uri:" + FileSystem.getDefaultUri(fs.getFileSystem().getConf()));
LOG.debug("from configuration uri:" + FileSystem.getDefaultUri(fs.conf));
LOG.debug("from configuration uri:" + FileSystem.getDefaultUri(fs.getConfiguration()));
// make sure the set uri matches by forcing it.
assertEquals(masterRoot, rootDir);
}
@Test
public void testRemoveStaleRecoveringRegionsDuringMasterInitialization() throws Exception {
// this test is for when distributed log replay is enabled
if (!UTIL.getConfiguration().getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false)) return;
LOG.info("Starting testRemoveStaleRecoveringRegionsDuringMasterInitialization");
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MasterFileSystem fs = master.getMasterFileSystem();
String failedRegion = "failedRegoin1";
String staleRegion = "staleRegion";
ServerName inRecoveryServerName = ServerName.valueOf("mgr,1,1");
ServerName previouselyFaildServerName = ServerName.valueOf("previous,1,1");
String walPath = "/hbase/data/.logs/" + inRecoveryServerName.getServerName()
+ "-splitting/test";
// Create a ZKW to use in the test
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath),
new SplitLogTask.Owned(inRecoveryServerName, fs.getLogRecoveryMode()).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, staleRegion);
ZKUtil.createWithParents(zkw, staleRegionPath);
String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);
inRecoveringRegionPath = ZKUtil.joinZNode(inRecoveringRegionPath,
inRecoveryServerName.getServerName());
ZKUtil.createWithParents(zkw, inRecoveringRegionPath);
Set<ServerName> servers = new HashSet<ServerName>();
servers.add(previouselyFaildServerName);
fs.removeStaleRecoveringRegionsFromZK(servers);
// verification
assertFalse(ZKUtil.checkExists(zkw, staleRegionPath) != -1);
assertTrue(ZKUtil.checkExists(zkw, inRecoveringRegionPath) != -1);
ZKUtil.deleteChildrenRecursively(zkw, zkw.recoveringRegionsZNode);
ZKUtil.deleteChildrenRecursively(zkw, zkw.splitLogZNode);
zkw.close();
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the master wal manager in a local cluster
*/
@Category({MasterTests.class, MediumTests.class})
public class TestMasterWalManager {
private static final Log LOG = LogFactory.getLog(TestMasterWalManager.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setupTest() throws Exception {
UTIL.startMiniCluster();
}
@AfterClass
public static void teardownTest() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void testRemoveStaleRecoveringRegionsDuringMasterInitialization() throws Exception {
// this test is for when distributed log replay is enabled
if (!UTIL.getConfiguration().getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false)) return;
LOG.info("Starting testRemoveStaleRecoveringRegionsDuringMasterInitialization");
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MasterWalManager mwm = master.getMasterWalManager();
String failedRegion = "failedRegoin1";
String staleRegion = "staleRegion";
ServerName inRecoveryServerName = ServerName.valueOf("mgr,1,1");
ServerName previouselyFaildServerName = ServerName.valueOf("previous,1,1");
String walPath = "/hbase/data/.logs/" + inRecoveryServerName.getServerName()
+ "-splitting/test";
// Create a ZKW to use in the test
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, walPath),
new SplitLogTask.Owned(inRecoveryServerName, mwm.getLogRecoveryMode()).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, staleRegion);
ZKUtil.createWithParents(zkw, staleRegionPath);
String inRecoveringRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);
inRecoveringRegionPath = ZKUtil.joinZNode(inRecoveringRegionPath,
inRecoveryServerName.getServerName());
ZKUtil.createWithParents(zkw, inRecoveringRegionPath);
Set<ServerName> servers = new HashSet<ServerName>();
servers.add(previouselyFaildServerName);
mwm.removeStaleRecoveringRegionsFromZK(servers);
// verification
assertFalse(ZKUtil.checkExists(zkw, staleRegionPath) != -1);
assertTrue(ZKUtil.checkExists(zkw, inRecoveringRegionPath) != -1);
ZKUtil.deleteChildrenRecursively(zkw, zkw.recoveringRegionsZNode);
ZKUtil.deleteChildrenRecursively(zkw, zkw.splitLogZNode);
zkw.close();
}
}