HBASE-17437 Support specifying a WAL directory outside of the root directory (Yishan Yang and Zach York)
HBASE-17588 Remove unused imports brought in by HBASE-17437 (Zach York) Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
parent
753169a3af
commit
8f6388503b
|
@ -1144,6 +1144,13 @@ possible configurations would overwhelm and obscure the important.
|
|||
When master starts, it creates the rootdir with this permissions or sets the permissions
|
||||
if it does not match.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.wal.dir.perms</name>
|
||||
<value>700</value>
|
||||
<description>FS Permissions for the root WAL directory in a secure(kerberos) setup.
|
||||
When master starts, it creates the WAL dir with this permissions or sets the permissions
|
||||
if it does not match.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.data.umask.enable</name>
|
||||
<value>false</value>
|
||||
|
|
|
@ -114,7 +114,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
private final LeaseRecovery leaseRecovery;
|
||||
private final Configuration conf;
|
||||
private final FileSystem fs;
|
||||
private final Path logDir;
|
||||
private final Path walDir;
|
||||
|
||||
private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
|
||||
private final AtomicBoolean loading = new AtomicBoolean(true);
|
||||
|
@ -170,11 +170,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
}
|
||||
}
|
||||
|
||||
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
|
||||
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
|
||||
final LeaseRecovery leaseRecovery) {
|
||||
this.fs = fs;
|
||||
this.conf = conf;
|
||||
this.logDir = logDir;
|
||||
this.walDir = walDir;
|
||||
this.leaseRecovery = leaseRecovery;
|
||||
}
|
||||
|
||||
|
@ -939,8 +939,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
// ==========================================================================
|
||||
// FileSystem Log Files helpers
|
||||
// ==========================================================================
|
||||
public Path getLogDir() {
|
||||
return this.logDir;
|
||||
public Path getWALDir() {
|
||||
return this.walDir;
|
||||
}
|
||||
|
||||
public FileSystem getFileSystem() {
|
||||
|
@ -948,7 +948,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
}
|
||||
|
||||
protected Path getLogFilePath(final long logId) throws IOException {
|
||||
return new Path(logDir, String.format("state-%020d.log", logId));
|
||||
return new Path(walDir, String.format("state-%020d.log", logId));
|
||||
}
|
||||
|
||||
private static long getLogIdFromName(final String name) {
|
||||
|
@ -982,7 +982,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
|
||||
private FileStatus[] getLogFiles() throws IOException {
|
||||
try {
|
||||
FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
|
||||
FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
|
||||
Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
|
||||
return files;
|
||||
} catch (FileNotFoundException e) {
|
||||
|
|
|
@ -59,8 +59,8 @@ public class ProcedureTestingUtility {
|
|||
}
|
||||
|
||||
public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
|
||||
final Path logDir) throws IOException {
|
||||
return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
|
||||
final Path walDir) throws IOException {
|
||||
return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
// no-op
|
||||
|
|
|
@ -217,10 +217,10 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
|||
public void tearDownProcedureStore() {
|
||||
store.stop(false);
|
||||
try {
|
||||
store.getFileSystem().delete(store.getLogDir(), true);
|
||||
store.getFileSystem().delete(store.getWALDir(), true);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
|
||||
+ "disk space. Location: " + store.getLogDir().toString());
|
||||
+ "disk space. Location: " + store.getWALDir().toString());
|
||||
System.err.println(e.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,10 +107,10 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
|
|||
private void tearDownProcedureStore() {
|
||||
store.stop(false);
|
||||
try {
|
||||
store.getFileSystem().delete(store.getLogDir(), true);
|
||||
store.getFileSystem().delete(store.getWALDir(), true);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
|
||||
+ "disk space. Location: " + store.getLogDir().toString());
|
||||
+ "disk space. Location: " + store.getWALDir().toString());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.lang.reflect.Proxy;
|
|||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.URI;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -51,16 +52,19 @@ import org.apache.hadoop.ipc.RPC;
|
|||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
|
||||
|
||||
/**
|
||||
* An encapsulation for the FileSystem object that hbase uses to access
|
||||
* data. This class allows the flexibility of using
|
||||
* data. This class allows the flexibility of using
|
||||
* separate filesystem objects for reading and writing hfiles and wals.
|
||||
* In future, if we want to make wals be in a different filesystem,
|
||||
* this is the place to make it happen.
|
||||
*/
|
||||
public class HFileSystem extends FilterFileSystem {
|
||||
public static final Log LOG = LogFactory.getLog(HFileSystem.class);
|
||||
|
||||
/** Parameter name for HBase WAL directory */
|
||||
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
|
||||
|
||||
private final FileSystem noChecksumFs; // read hfile data from storage
|
||||
private final boolean useHBaseChecksum;
|
||||
|
||||
|
@ -79,7 +83,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
// the underlying filesystem that has checksums switched on.
|
||||
this.fs = FileSystem.get(conf);
|
||||
this.useHBaseChecksum = useHBaseChecksum;
|
||||
|
||||
|
||||
fs.initialize(getDefaultUri(conf), conf);
|
||||
|
||||
// disable checksum verification for local fileSystem, see HBASE-11218
|
||||
|
|
|
@ -45,17 +45,17 @@ public class WALLink extends FileLink {
|
|||
*/
|
||||
public WALLink(final Configuration conf,
|
||||
final String serverName, final String logName) throws IOException {
|
||||
this(FSUtils.getRootDir(conf), serverName, logName);
|
||||
this(FSUtils.getWALRootDir(conf), serverName, logName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param rootDir Path to the root directory where hbase files are stored
|
||||
* @param walRootDir Path to the root directory where hbase files are stored
|
||||
* @param serverName Region Server owner of the log
|
||||
* @param logName WAL file name
|
||||
*/
|
||||
public WALLink(final Path rootDir, final String serverName, final String logName) {
|
||||
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
|
||||
public WALLink(final Path walRootDir, final String serverName, final String logName) {
|
||||
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final Path logDir = new Path(new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
|
||||
setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
|
||||
}
|
||||
|
||||
|
|
|
@ -610,15 +610,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
|
||||
if (!queuedDeadServers.isEmpty()) {
|
||||
Configuration conf = server.getConfiguration();
|
||||
Path rootdir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = rootdir.getFileSystem(conf);
|
||||
Path walRootDir = FSUtils.getWALRootDir(conf);
|
||||
FileSystem walFs = FSUtils.getWALFileSystem(conf);
|
||||
for (ServerName serverName: queuedDeadServers) {
|
||||
// In the case of a clean exit, the shutdown handler would have presplit any WALs and
|
||||
// removed empty directories.
|
||||
Path logDir = new Path(rootdir,
|
||||
Path walDir = new Path(walRootDir,
|
||||
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
|
||||
if (fs.exists(logDir) || fs.exists(splitDir)) {
|
||||
Path splitDir = walDir.suffix(DefaultWALProvider.SPLITTING_EXT);
|
||||
if (walFs.exists(walDir) || walFs.exists(splitDir)) {
|
||||
LOG.debug("Found queued dead server " + serverName);
|
||||
failover = true;
|
||||
break;
|
||||
|
|
|
@ -1162,7 +1162,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
|
||||
this.logCleaner =
|
||||
new LogCleaner(cleanerInterval,
|
||||
this, conf, getMasterFileSystem().getFileSystem(),
|
||||
this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),
|
||||
getMasterFileSystem().getOldLogDir());
|
||||
getChoreService().scheduleChore(logCleaner);
|
||||
|
||||
|
@ -1243,10 +1243,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
|
||||
private void startProcedureExecutor() throws IOException {
|
||||
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
|
||||
final Path logDir = new Path(fileSystemManager.getRootDir(),
|
||||
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
|
||||
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
|
||||
|
||||
procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
|
||||
procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
|
||||
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
|
||||
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
|
||||
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
|
||||
|
|
|
@ -66,7 +66,14 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterFileSystem {
|
||||
private static final Log LOG = LogFactory.getLog(MasterFileSystem.class.getName());
|
||||
private static final Log LOG = LogFactory.getLog(MasterFileSystem.class);
|
||||
|
||||
/** Parameter name for HBase instance root directory permission*/
|
||||
public static final String HBASE_DIR_PERMS = "hbase.rootdir.perms";
|
||||
|
||||
/** Parameter name for HBase WAL directory permission*/
|
||||
public static final String HBASE_WAL_DIR_PERMS = "hbase.wal.dir.perms";
|
||||
|
||||
// HBase configuration
|
||||
Configuration conf;
|
||||
// master status
|
||||
|
@ -77,8 +84,11 @@ public class MasterFileSystem {
|
|||
private ClusterId clusterId;
|
||||
// Keep around for convenience.
|
||||
private final FileSystem fs;
|
||||
private final FileSystem walFs;
|
||||
// root WAL directory
|
||||
private final Path walRootDir;
|
||||
// Is the fileystem ok?
|
||||
private volatile boolean fsOk = true;
|
||||
private volatile boolean walFsOk = true;
|
||||
// The Path to the old logs dir
|
||||
private final Path oldLogDir;
|
||||
// root hbase directory on the FS
|
||||
|
@ -119,6 +129,10 @@ public class MasterFileSystem {
|
|||
// Cover both bases, the old way of setting default fs and the new.
|
||||
// We're supposed to run on 0.20 and 0.21 anyways.
|
||||
this.fs = this.rootdir.getFileSystem(conf);
|
||||
this.walRootDir = FSUtils.getWALRootDir(conf);
|
||||
this.walFs = FSUtils.getWALFileSystem(conf);
|
||||
FSUtils.setFsDefault(conf, new Path(this.walFs.getUri()));
|
||||
walFs.setConf(conf);
|
||||
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
|
||||
// make sure the fs has the same conf
|
||||
fs.setConf(conf);
|
||||
|
@ -148,17 +162,21 @@ public class MasterFileSystem {
|
|||
* Idempotent.
|
||||
*/
|
||||
private Path createInitialFileSystemLayout() throws IOException {
|
||||
// check if the root directory exists
|
||||
checkRootDir(this.rootdir, conf, this.fs);
|
||||
|
||||
checkRootDir(this.rootdir, conf, this.fs, HConstants.HBASE_DIR, HBASE_DIR_PERMS);
|
||||
// if the log directory is different from root, check if it exists
|
||||
if (!this.walRootDir.equals(this.rootdir)) {
|
||||
checkRootDir(this.walRootDir, conf, this.walFs, HFileSystem.HBASE_WAL_DIR, HBASE_WAL_DIR_PERMS);
|
||||
}
|
||||
|
||||
// check if temp directory exists and clean it
|
||||
checkTempDir(this.tempdir, conf, this.fs);
|
||||
|
||||
Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path oldLogDir = new Path(this.walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
|
||||
// Make sure the region servers can archive their old logs
|
||||
if(!this.fs.exists(oldLogDir)) {
|
||||
this.fs.mkdirs(oldLogDir);
|
||||
if(!this.walFs.exists(oldLogDir)) {
|
||||
this.walFs.mkdirs(oldLogDir);
|
||||
}
|
||||
|
||||
return oldLogDir;
|
||||
|
@ -182,16 +200,24 @@ public class MasterFileSystem {
|
|||
* @return false if file system is not available
|
||||
*/
|
||||
public boolean checkFileSystem() {
|
||||
if (this.fsOk) {
|
||||
if (this.walFsOk) {
|
||||
try {
|
||||
FSUtils.checkFileSystemAvailable(this.fs);
|
||||
FSUtils.checkFileSystemAvailable(this.walFs);
|
||||
FSUtils.checkDfsSafeMode(this.conf);
|
||||
} catch (IOException e) {
|
||||
master.abort("Shutting down HBase cluster: file system not available", e);
|
||||
this.fsOk = false;
|
||||
this.walFsOk = false;
|
||||
}
|
||||
}
|
||||
return this.fsOk;
|
||||
return this.walFsOk;
|
||||
}
|
||||
|
||||
protected FileSystem getWALFileSystem() {
|
||||
return this.walFs;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -201,6 +227,11 @@ public class MasterFileSystem {
|
|||
return this.rootdir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return HBase root log dir.
|
||||
*/
|
||||
public Path getWALRootDir() { return this.walRootDir; }
|
||||
|
||||
/**
|
||||
* @return HBase temp dir.
|
||||
*/
|
||||
|
@ -224,7 +255,7 @@ public class MasterFileSystem {
|
|||
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
|
||||
|
||||
Set<ServerName> serverNames = new HashSet<ServerName>();
|
||||
Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
|
||||
Path logsDirPath = new Path(this.walRootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
|
||||
do {
|
||||
if (master.isStopped()) {
|
||||
|
@ -232,8 +263,8 @@ public class MasterFileSystem {
|
|||
break;
|
||||
}
|
||||
try {
|
||||
if (!this.fs.exists(logsDirPath)) return serverNames;
|
||||
FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
|
||||
if (!this.walFs.exists(logsDirPath)) return serverNames;
|
||||
FileStatus[] logFolders = FSUtils.listStatus(this.walFs, logsDirPath, null);
|
||||
// Get online servers after getting log folders to avoid log folder deletion of newly
|
||||
// checked in region servers . see HBASE-5916
|
||||
Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers()
|
||||
|
@ -244,7 +275,7 @@ public class MasterFileSystem {
|
|||
return serverNames;
|
||||
}
|
||||
for (FileStatus status : logFolders) {
|
||||
FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null);
|
||||
FileStatus[] curLogFiles = FSUtils.listStatus(this.walFs, status.getPath(), null);
|
||||
if (curLogFiles == null || curLogFiles.length == 0) {
|
||||
// Empty log folder. No recovery needed
|
||||
continue;
|
||||
|
@ -325,17 +356,17 @@ public class MasterFileSystem {
|
|||
}
|
||||
try {
|
||||
for (ServerName serverName : serverNames) {
|
||||
Path logDir = new Path(this.rootdir,
|
||||
Path logDir = new Path(this.walRootDir,
|
||||
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
|
||||
// Rename the directory so a rogue RS doesn't create more WALs
|
||||
if (fs.exists(logDir)) {
|
||||
if (!this.fs.rename(logDir, splitDir)) {
|
||||
if (walFs.exists(logDir)) {
|
||||
if (!this.walFs.rename(logDir, splitDir)) {
|
||||
throw new IOException("Failed fs.rename for log split: " + logDir);
|
||||
}
|
||||
logDir = splitDir;
|
||||
LOG.debug("Renamed region directory: " + splitDir);
|
||||
} else if (!fs.exists(splitDir)) {
|
||||
} else if (!walFs.exists(splitDir)) {
|
||||
LOG.info("Log dir for server " + serverName + " does not exist");
|
||||
continue;
|
||||
}
|
||||
|
@ -417,19 +448,19 @@ public class MasterFileSystem {
|
|||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
private Path checkRootDir(final Path rd, final Configuration c,
|
||||
final FileSystem fs)
|
||||
final FileSystem fs, final String dirConfKey, final String dirPermsConfName)
|
||||
throws IOException {
|
||||
// If FS is in safe mode wait till out of it.
|
||||
FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
||||
|
||||
boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication"));
|
||||
FsPermission rootDirPerms = new FsPermission(c.get("hbase.rootdir.perms", "700"));
|
||||
FsPermission dirPerms = new FsPermission(c.get(dirPermsConfName, "700"));
|
||||
|
||||
// Filesystem is good. Go ahead and check for hbase.rootdir.
|
||||
// Filesystem is good. Go ahead and check for rootdir.
|
||||
try {
|
||||
if (!fs.exists(rd)) {
|
||||
if (isSecurityEnabled) {
|
||||
fs.mkdirs(rd, rootDirPerms);
|
||||
fs.mkdirs(rd, dirPerms);
|
||||
} else {
|
||||
fs.mkdirs(rd);
|
||||
}
|
||||
|
@ -447,15 +478,15 @@ public class MasterFileSystem {
|
|||
if (!fs.isDirectory(rd)) {
|
||||
throw new IllegalArgumentException(rd.toString() + " is not a directory");
|
||||
}
|
||||
if (isSecurityEnabled && !rootDirPerms.equals(fs.getFileStatus(rd).getPermission())) {
|
||||
if (isSecurityEnabled && !dirPerms.equals(fs.getFileStatus(rd).getPermission())) {
|
||||
// check whether the permission match
|
||||
LOG.warn("Found rootdir permissions NOT matching expected \"hbase.rootdir.perms\" for "
|
||||
LOG.warn("Found rootdir permissions NOT matching expected \"" + dirPermsConfName + "\" for "
|
||||
+ "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
|
||||
+ " and \"hbase.rootdir.perms\" configured as "
|
||||
+ c.get("hbase.rootdir.perms", "700") + ". Automatically setting the permissions. You"
|
||||
+ " can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
|
||||
+ " and \"" + dirPermsConfName + "\" configured as "
|
||||
+ c.get(dirPermsConfName, "700") + ". Automatically setting the permissions. You"
|
||||
+ " can change the permissions by setting \"" + dirPermsConfName + "\" in hbase-site.xml "
|
||||
+ "and restarting the master");
|
||||
fs.setPermission(rd, rootDirPerms);
|
||||
fs.setPermission(rd, dirPerms);
|
||||
}
|
||||
// as above
|
||||
FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
|
||||
|
@ -463,38 +494,41 @@ public class MasterFileSystem {
|
|||
HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
|
||||
}
|
||||
} catch (DeserializationException de) {
|
||||
LOG.fatal("Please fix invalid configuration for " + HConstants.HBASE_DIR, de);
|
||||
LOG.fatal("Please fix invalid configuration for " + dirConfKey, de);
|
||||
IOException ioe = new IOException();
|
||||
ioe.initCause(de);
|
||||
throw ioe;
|
||||
} catch (IllegalArgumentException iae) {
|
||||
LOG.fatal("Please fix invalid configuration for "
|
||||
+ HConstants.HBASE_DIR + " " + rd.toString(), iae);
|
||||
+ dirConfKey + " " + rd.toString(), iae);
|
||||
throw iae;
|
||||
}
|
||||
// Make sure cluster ID exists
|
||||
if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
|
||||
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
|
||||
FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
||||
|
||||
if (dirConfKey.equals(HConstants.HBASE_DIR)) {
|
||||
// Make sure cluster ID exists
|
||||
if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
|
||||
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
|
||||
FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
||||
}
|
||||
clusterId = FSUtils.getClusterId(fs, rd);
|
||||
|
||||
// Make sure the meta region directory exists!
|
||||
if (!FSUtils.metaRegionExists(fs, rd)) {
|
||||
bootstrap(rd, c);
|
||||
} else {
|
||||
// Migrate table descriptor files if necessary
|
||||
org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
|
||||
.migrateFSTableDescriptorsIfNecessary(fs, rd);
|
||||
}
|
||||
|
||||
// Create tableinfo-s for hbase:meta if not already there.
|
||||
|
||||
// meta table is a system table, so descriptors are predefined,
|
||||
// we should get them from registry.
|
||||
FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
|
||||
fsd.createTableDescriptor(
|
||||
new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
|
||||
}
|
||||
clusterId = FSUtils.getClusterId(fs, rd);
|
||||
|
||||
// Make sure the meta region directory exists!
|
||||
if (!FSUtils.metaRegionExists(fs, rd)) {
|
||||
bootstrap(rd, c);
|
||||
} else {
|
||||
// Migrate table descriptor files if necessary
|
||||
org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
|
||||
.migrateFSTableDescriptorsIfNecessary(fs, rd);
|
||||
}
|
||||
|
||||
// Create tableinfo-s for hbase:meta if not already there.
|
||||
|
||||
// meta table is a system table, so descriptors are predefined,
|
||||
// we should get them from registry.
|
||||
FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
|
||||
fsd.createTableDescriptor(
|
||||
new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
|
||||
|
||||
return rd;
|
||||
}
|
||||
|
|
|
@ -266,7 +266,7 @@ public class SplitLogManager {
|
|||
// recover-lease is done. totalSize will be under in most cases and the
|
||||
// metrics that it drives will also be under-reported.
|
||||
totalSize += lf.getLen();
|
||||
String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
|
||||
String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
|
||||
if (!enqueueSplitTask(pathToLog, batch)) {
|
||||
throw new IOException("duplicate log split scheduled for " + lf.getPath());
|
||||
}
|
||||
|
|
|
@ -6674,7 +6674,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// The WAL subsystem will use the default rootDir rather than the passed in rootDir
|
||||
// unless I pass along via the conf.
|
||||
Configuration confForWAL = new Configuration(conf);
|
||||
confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
|
||||
FSUtils.setRootDir(confForWAL, rootDir);
|
||||
effectiveWAL = (new WALFactory(confForWAL,
|
||||
Collections.<WALActionsListener>singletonList(new MetricsWAL()),
|
||||
"hregion-" + RandomStringUtils.randomNumeric(8))).
|
||||
|
|
|
@ -298,6 +298,7 @@ public class HRegionServer extends HasThread implements
|
|||
// If false, the file system has become unavailable
|
||||
protected volatile boolean fsOk;
|
||||
protected HFileSystem fs;
|
||||
protected HFileSystem walFs;
|
||||
|
||||
// Set when a report to the master comes back with a message asking us to
|
||||
// shutdown. Also set by call to stop when debugging or running unit tests
|
||||
|
@ -319,6 +320,7 @@ public class HRegionServer extends HasThread implements
|
|||
protected final Configuration conf;
|
||||
|
||||
private Path rootDir;
|
||||
private Path walRootDir;
|
||||
|
||||
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
|
@ -634,13 +636,16 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
|
||||
private void initializeFileSystem() throws IOException {
|
||||
// Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
|
||||
// checksum verification enabled, then automatically switch off hdfs checksum verification.
|
||||
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
|
||||
FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
|
||||
this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
|
||||
this.walRootDir = FSUtils.getWALRootDir(this.conf);
|
||||
// Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
|
||||
// underlying hadoop hdfs accessors will be going against wrong filesystem
|
||||
// (unless all is set to defaults).
|
||||
FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
|
||||
// Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
|
||||
// checksum verification enabled, then automatically switch off hdfs checksum verification.
|
||||
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
|
||||
this.fs = new HFileSystem(this.conf, useHBaseChecksum);
|
||||
this.rootDir = FSUtils.getRootDir(this.conf);
|
||||
this.tableDescriptors = new FSTableDescriptors(
|
||||
|
@ -1674,19 +1679,19 @@ public class HRegionServer extends HasThread implements
|
|||
*/
|
||||
private WALFactory setupWALAndReplication() throws IOException {
|
||||
// TODO Replication make assumptions here based on the default filesystem impl
|
||||
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
|
||||
|
||||
Path logdir = new Path(rootDir, logName);
|
||||
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
|
||||
if (this.fs.exists(logdir)) {
|
||||
Path logDir = new Path(walRootDir, logName);
|
||||
if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
|
||||
if (this.walFs.exists(logDir)) {
|
||||
throw new RegionServerRunningException("Region server has already " +
|
||||
"created directory at " + this.serverName.toString());
|
||||
}
|
||||
|
||||
// Instantiate replication manager if replication enabled. Pass it the
|
||||
// log directories.
|
||||
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
|
||||
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
|
||||
|
||||
// listeners the wal factory will add to wals it creates.
|
||||
final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
|
||||
|
@ -2600,6 +2605,20 @@ public class HRegionServer extends HasThread implements
|
|||
return fs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the walRootDir.
|
||||
*/
|
||||
protected Path getWALRootDir() {
|
||||
return walRootDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the walFs.
|
||||
*/
|
||||
protected FileSystem getWALFileSystem() {
|
||||
return walFs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getServerName().toString();
|
||||
|
@ -2666,7 +2685,7 @@ public class HRegionServer extends HasThread implements
|
|||
* Load the replication service objects, if any
|
||||
*/
|
||||
static private void createNewReplicationInstance(Configuration conf,
|
||||
HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
|
||||
HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
|
||||
|
||||
// If replication is not enabled, then return immediately.
|
||||
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
|
@ -2692,21 +2711,21 @@ public class HRegionServer extends HasThread implements
|
|||
if (sourceClassname.equals(sinkClassname)) {
|
||||
server.replicationSourceHandler = (ReplicationSourceService)
|
||||
newReplicationInstance(sourceClassname,
|
||||
conf, server, fs, logDir, oldLogDir);
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
server.replicationSinkHandler = (ReplicationSinkService)
|
||||
server.replicationSourceHandler;
|
||||
} else {
|
||||
server.replicationSourceHandler = (ReplicationSourceService)
|
||||
newReplicationInstance(sourceClassname,
|
||||
conf, server, fs, logDir, oldLogDir);
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
server.replicationSinkHandler = (ReplicationSinkService)
|
||||
newReplicationInstance(sinkClassname,
|
||||
conf, server, fs, logDir, oldLogDir);
|
||||
conf, server, walFs, walDir, oldWALDir);
|
||||
}
|
||||
}
|
||||
|
||||
static private ReplicationService newReplicationInstance(String classname,
|
||||
Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
|
||||
Configuration conf, HRegionServer server, FileSystem walFs, Path walDir,
|
||||
Path oldLogDir) throws IOException{
|
||||
|
||||
Class<?> clazz = null;
|
||||
|
@ -2720,7 +2739,7 @@ public class HRegionServer extends HasThread implements
|
|||
// create an instance of the replication object.
|
||||
ReplicationService service = (ReplicationService)
|
||||
ReflectionUtils.newInstance(clazz, conf);
|
||||
service.initialize(server, fs, logDir, oldLogDir);
|
||||
service.initialize(server, walFs, walDir, oldLogDir);
|
||||
return service;
|
||||
}
|
||||
|
||||
|
|
|
@ -88,11 +88,11 @@ public class SplitLogWorker implements Runnable {
|
|||
this(server, conf, server, new TaskExecutor() {
|
||||
@Override
|
||||
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
|
||||
Path rootdir;
|
||||
Path walDir;
|
||||
FileSystem fs;
|
||||
try {
|
||||
rootdir = FSUtils.getRootDir(conf);
|
||||
fs = rootdir.getFileSystem(conf);
|
||||
walDir = FSUtils.getWALRootDir(conf);
|
||||
fs = walDir.getFileSystem(conf);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("could not find root dir or fs", e);
|
||||
return Status.RESIGNED;
|
||||
|
@ -101,7 +101,7 @@ public class SplitLogWorker implements Runnable {
|
|||
// interrupted or has encountered a transient error and when it has
|
||||
// encountered a bad non-retry-able persistent error.
|
||||
try {
|
||||
if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
|
||||
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
|
||||
fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
|
||||
return Status.PREEMPTED;
|
||||
}
|
||||
|
|
|
@ -1586,9 +1586,9 @@ public class FSHLog implements WAL {
|
|||
ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
|
||||
ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
|
||||
|
||||
private static void split(final Configuration conf, final Path p)
|
||||
throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
private static void split(final Configuration conf, final Path p) throws IOException {
|
||||
FileSystem fs = FSUtils.getWALFileSystem(conf);
|
||||
if (!fs.exists(p)) {
|
||||
throw new FileNotFoundException(p.toString());
|
||||
}
|
||||
|
@ -1596,7 +1596,7 @@ public class FSHLog implements WAL {
|
|||
throw new IOException(p + " is not a directory");
|
||||
}
|
||||
|
||||
final Path baseDir = FSUtils.getRootDir(conf);
|
||||
final Path baseDir = FSUtils.getWALRootDir(conf);
|
||||
final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
|
||||
}
|
||||
|
|
|
@ -962,9 +962,9 @@ public class ReplicationSource extends Thread
|
|||
// to look at)
|
||||
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
|
||||
LOG.info("NB dead servers : " + deadRegionServers.size());
|
||||
final Path rootDir = FSUtils.getRootDir(conf);
|
||||
final Path walDir = FSUtils.getWALRootDir(conf);
|
||||
for (String curDeadServerName : deadRegionServers) {
|
||||
final Path deadRsDirectory = new Path(rootDir,
|
||||
final Path deadRsDirectory = new Path(walDir,
|
||||
DefaultWALProvider.getWALDirectoryName(curDeadServerName));
|
||||
Path[] locs = new Path[] {
|
||||
new Path(deadRsDirectory, currentPath.getName()),
|
||||
|
@ -986,7 +986,7 @@ public class ReplicationSource extends Thread
|
|||
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
|
||||
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
|
||||
if (stopper instanceof ReplicationSyncUp.DummyServer) {
|
||||
// N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
|
||||
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
|
||||
// area rather than to the wal area for a particular region server.
|
||||
FileStatus[] rss = fs.listStatus(manager.getLogDir());
|
||||
for (FileStatus rs : rss) {
|
||||
|
|
|
@ -76,7 +76,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
Replication replication;
|
||||
ReplicationSourceManager manager;
|
||||
FileSystem fs;
|
||||
Path oldLogDir, logDir, rootDir;
|
||||
Path oldLogDir, logDir, walRootDir;
|
||||
ZooKeeperWatcher zkw;
|
||||
|
||||
Abortable abortable = new Abortable() {
|
||||
|
@ -94,10 +94,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable,
|
||||
true);
|
||||
|
||||
rootDir = FSUtils.getRootDir(conf);
|
||||
fs = FileSystem.get(conf);
|
||||
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
walRootDir = FSUtils.getWALRootDir(conf);
|
||||
fs = FSUtils.getWALFileSystem(conf);
|
||||
oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
|
||||
System.out.println("Start Replication Server start");
|
||||
replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -99,6 +100,9 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
|
||||
|
||||
/**
|
||||
* Utility methods for interacting with the underlying file system.
|
||||
*/
|
||||
|
@ -948,22 +952,22 @@ public abstract class FSUtils {
|
|||
return root;
|
||||
} catch (URISyntaxException e) {
|
||||
IOException io = new IOException("Root directory path is not a valid " +
|
||||
"URI -- check your " + HConstants.HBASE_DIR + " configuration");
|
||||
"URI -- check your " + HBASE_DIR + " configuration");
|
||||
io.initCause(e);
|
||||
throw io;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks for the presence of the root path (using the provided conf object) in the given path. If
|
||||
* Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
|
||||
* it exists, this method removes it and returns the String representation of remaining relative path.
|
||||
* @param path
|
||||
* @param conf
|
||||
* @return String representation of the remaining relative path
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String removeRootPath(Path path, final Configuration conf) throws IOException {
|
||||
Path root = FSUtils.getRootDir(conf);
|
||||
public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
|
||||
Path root = getWALRootDir(conf);
|
||||
String pathStr = path.toString();
|
||||
// check that the path is absolute... it has the root path in it.
|
||||
if (!pathStr.startsWith(root.toString())) return pathStr;
|
||||
|
@ -1010,24 +1014,65 @@ public abstract class FSUtils {
|
|||
|
||||
/**
|
||||
* @param c configuration
|
||||
* @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
|
||||
* @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from
|
||||
* configuration as a qualified Path.
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Path getRootDir(final Configuration c) throws IOException {
|
||||
Path p = new Path(c.get(HConstants.HBASE_DIR));
|
||||
Path p = new Path(c.get(HBASE_DIR));
|
||||
FileSystem fs = p.getFileSystem(c);
|
||||
return p.makeQualified(fs);
|
||||
}
|
||||
|
||||
public static void setRootDir(final Configuration c, final Path root) throws IOException {
|
||||
c.set(HConstants.HBASE_DIR, root.toString());
|
||||
c.set(HBASE_DIR, root.toString());
|
||||
}
|
||||
|
||||
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
|
||||
c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
|
||||
}
|
||||
|
||||
public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
|
||||
Path p = getRootDir(c);
|
||||
return p.getFileSystem(c);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c configuration
|
||||
* @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from
|
||||
* configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR}
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Path getWALRootDir(final Configuration c) throws IOException {
|
||||
Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR)));
|
||||
if (!isValidWALRootDir(p, c)) {
|
||||
return FSUtils.getRootDir(c);
|
||||
}
|
||||
FileSystem fs = p.getFileSystem(c);
|
||||
return p.makeQualified(fs);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
|
||||
c.set(HFileSystem.HBASE_WAL_DIR, root.toString());
|
||||
}
|
||||
|
||||
public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
|
||||
Path p = getWALRootDir(c);
|
||||
return p.getFileSystem(c);
|
||||
}
|
||||
|
||||
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
|
||||
Path rootDir = FSUtils.getRootDir(c);
|
||||
if (walDir != rootDir) {
|
||||
if (walDir.toString().startsWith(rootDir.toString() + "/")) {
|
||||
throw new IllegalStateException("Illegal WAL directory specified. " +
|
||||
"WAL directories are not permitted to be under the root directory if set.");
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if meta region exists
|
||||
*
|
||||
|
|
|
@ -47,10 +47,10 @@ import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
|
||||
/**
|
||||
* A WAL Provider that returns a single thread safe WAL that writes to HDFS.
|
||||
* By default, this implementation picks a directory in HDFS based on a combination of
|
||||
* A WAL Provider that returns a single thread safe WAL that writes to Hadoop FS.
|
||||
* By default, this implementation picks a directory in Hadoop FS based on a combination of
|
||||
* <ul>
|
||||
* <li>the HBase root directory
|
||||
* <li>the HBase root WAL directory
|
||||
* <li>HConstants.HREGION_LOGDIR_NAME
|
||||
* <li>the given factory's factoryId (usually identifying the regionserver by host:port)
|
||||
* </ul>
|
||||
|
@ -138,7 +138,7 @@ public class DefaultWALProvider implements WALProvider {
|
|||
// creating hlog on fs is time consuming
|
||||
synchronized (walCreateLock) {
|
||||
if (log == null) {
|
||||
log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
|
||||
log = new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
|
||||
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||
listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
|
@ -294,14 +294,10 @@ public class DefaultWALProvider implements WALProvider {
|
|||
throw new IllegalArgumentException("parameter conf must be set");
|
||||
}
|
||||
|
||||
final String rootDir = conf.get(HConstants.HBASE_DIR);
|
||||
if (rootDir == null || rootDir.isEmpty()) {
|
||||
throw new IllegalArgumentException(HConstants.HBASE_DIR
|
||||
+ " key not found in conf.");
|
||||
}
|
||||
final String walDir = FSUtils.getWALRootDir(conf).toString();
|
||||
|
||||
final StringBuilder startPathSB = new StringBuilder(rootDir);
|
||||
if (!rootDir.endsWith("/"))
|
||||
final StringBuilder startPathSB = new StringBuilder(walDir);
|
||||
if (!walDir.endsWith("/"))
|
||||
startPathSB.append('/');
|
||||
startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
|
||||
if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
|
||||
|
|
|
@ -63,7 +63,7 @@ class DisabledWALProvider implements WALProvider {
|
|||
if (null == providerId) {
|
||||
providerId = "defaultDisabled";
|
||||
}
|
||||
disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null);
|
||||
disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -244,24 +244,24 @@ public class WALSplitter {
|
|||
// log splitting. Used by tools and unit tests. It should be package private.
|
||||
// It is public only because UpgradeTo96 and TestWALObserver are in different packages,
|
||||
// which uses this method to do log splitting.
|
||||
public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
|
||||
FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
|
||||
public static List<Path> split(Path walRootDir, Path logDir, Path oldLogDir,
|
||||
FileSystem walFs, Configuration conf, final WALFactory factory) throws IOException {
|
||||
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
|
||||
Collections.singletonList(logDir), null);
|
||||
List<Path> splits = new ArrayList<Path>();
|
||||
if (logfiles != null && logfiles.length > 0) {
|
||||
for (FileStatus logfile: logfiles) {
|
||||
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
|
||||
WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, null, null,
|
||||
RecoveryMode.LOG_SPLITTING);
|
||||
if (s.splitLogFile(logfile, null)) {
|
||||
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
|
||||
finishSplitLogFile(walRootDir, oldLogDir, logfile.getPath(), conf);
|
||||
if (s.outputSink.splits != null) {
|
||||
splits.addAll(s.outputSink.splits);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!fs.delete(logDir, true)) {
|
||||
if (!walFs.delete(logDir, true)) {
|
||||
throw new IOException("Unable to delete src dir: " + logDir);
|
||||
}
|
||||
return splits;
|
||||
|
@ -444,7 +444,7 @@ public class WALSplitter {
|
|||
*/
|
||||
public static void finishSplitLogFile(String logfile,
|
||||
Configuration conf) throws IOException {
|
||||
Path rootdir = FSUtils.getRootDir(conf);
|
||||
Path rootdir = FSUtils.getWALRootDir(conf);
|
||||
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path logPath;
|
||||
if (FSUtils.isStartingWithPath(rootdir, logfile)) {
|
||||
|
@ -487,7 +487,7 @@ public class WALSplitter {
|
|||
final List<Path> corruptedLogs,
|
||||
final List<Path> processedLogs, final Path oldLogDir,
|
||||
final FileSystem fs, final Configuration conf) throws IOException {
|
||||
final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
|
||||
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), conf.get(
|
||||
"hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
|
||||
|
||||
if (!fs.mkdirs(corruptDir)) {
|
||||
|
|
|
@ -841,6 +841,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
return startMiniCluster(1, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
|
||||
* @throws Exception
|
||||
* @return Mini hbase cluster instance created.
|
||||
* @see {@link #shutdownMiniDFSCluster()}
|
||||
*/
|
||||
public MiniHBaseCluster startMiniCluster(boolean withWALDir) throws Exception {
|
||||
return startMiniCluster(1, 1, 1, null, null, null, false, withWALDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start up a minicluster of hbase, dfs, and zookeeper.
|
||||
* Set the <code>create</code> flag to create root or data directory path or not
|
||||
|
@ -872,6 +882,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
return startMiniCluster(1, numSlaves, false);
|
||||
}
|
||||
|
||||
public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create, boolean withWALDir)
|
||||
throws Exception {
|
||||
return startMiniCluster(1, numSlaves, numSlaves, null, null, null, create, withWALDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start minicluster. Whether to create a new root or data dir path even if such a path
|
||||
* has been created earlier is decided based on flag <code>create</code>
|
||||
|
@ -901,7 +916,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
final int numSlaves, final String[] dataNodeHosts, boolean create)
|
||||
throws Exception {
|
||||
return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
|
||||
null, null, create);
|
||||
null, null, create, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -984,7 +999,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||
throws Exception {
|
||||
return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
|
||||
masterClass, regionserverClass, false);
|
||||
masterClass, regionserverClass, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -998,7 +1013,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
|
||||
Class<? extends HMaster> masterClass,
|
||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
|
||||
boolean create)
|
||||
boolean create, boolean withWALDir)
|
||||
throws Exception {
|
||||
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
|
||||
numDataNodes = dataNodeHosts.length;
|
||||
|
@ -1029,12 +1044,12 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
|
||||
// Start the MiniHBaseCluster
|
||||
return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
|
||||
regionserverClass, create);
|
||||
regionserverClass, create, withWALDir);
|
||||
}
|
||||
|
||||
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
|
||||
throws IOException, InterruptedException{
|
||||
return startMiniHBaseCluster(numMasters, numSlaves, null, null, false);
|
||||
return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1053,11 +1068,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
|
||||
final int numSlaves, Class<? extends HMaster> masterClass,
|
||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
|
||||
boolean create)
|
||||
boolean create, boolean withWALDir)
|
||||
throws IOException, InterruptedException {
|
||||
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
|
||||
createRootDir(create);
|
||||
|
||||
if (withWALDir) {
|
||||
createWALRootDir();
|
||||
}
|
||||
|
||||
// These settings will make the server waits until this exact number of
|
||||
// regions servers are connected.
|
||||
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
|
||||
|
@ -1241,6 +1260,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
return createRootDir(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a hbase walDir in the user's home directory.
|
||||
* Normally you won't make use of this method. Root hbaseWALDir
|
||||
* is created for you as part of mini cluster startup. You'd only use this
|
||||
* method if you were doing manual operation.
|
||||
*
|
||||
* @return Fully qualified path to hbase WAL root dir
|
||||
* @throws IOException
|
||||
*/
|
||||
public Path createWALRootDir() throws IOException {
|
||||
FileSystem fs = FileSystem.get(this.conf);
|
||||
Path walDir = getNewDataTestDirOnTestFS();
|
||||
FSUtils.setWALRootDir(this.conf, walDir);
|
||||
fs.mkdirs(walDir);
|
||||
return walDir;
|
||||
}
|
||||
|
||||
private void setHBaseFsTmpDir() throws IOException {
|
||||
String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
|
||||
|
@ -1817,12 +1852,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
/**
|
||||
* Create an unmanaged WAL. Be sure to close it when you're through.
|
||||
*/
|
||||
public static WAL createWal(final Configuration conf, final Path rootDir, final HRegionInfo hri)
|
||||
public static WAL createWal(final Configuration conf, final Path rootDir, final Path walRootDir, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
// The WAL subsystem will use the default rootDir rather than the passed in rootDir
|
||||
// unless I pass along via the conf.
|
||||
Configuration confForWAL = new Configuration(conf);
|
||||
confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
|
||||
FSUtils.setWALRootDir(confForWAL, walRootDir);
|
||||
return (new WALFactory(confForWAL,
|
||||
Collections.<WALActionsListener>singletonList(new MetricsWAL()),
|
||||
"hregion-" + RandomStringUtils.randomNumeric(8))).
|
||||
|
@ -1834,8 +1870,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
|
||||
*/
|
||||
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
|
||||
final Configuration conf, final HTableDescriptor htd) throws IOException {
|
||||
return createRegionAndWAL(info, rootDir, conf, htd, true);
|
||||
final Path walRootDir, final Configuration conf, final HTableDescriptor htd) throws IOException {
|
||||
return createRegionAndWAL(info, rootDir, walRootDir, conf, htd, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1843,9 +1879,9 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
|
||||
*/
|
||||
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
|
||||
final Configuration conf, final HTableDescriptor htd, boolean initialize)
|
||||
final Path walRootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize)
|
||||
throws IOException {
|
||||
WAL wal = createWal(conf, rootDir, info);
|
||||
WAL wal = createWal(conf, rootDir, walRootDir, info);
|
||||
return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ public class TestWALObserver {
|
|||
private FileSystem fs;
|
||||
private Path dir;
|
||||
private Path hbaseRootDir;
|
||||
private Path hbaseWALRootDir;
|
||||
private String logName;
|
||||
private Path oldLogDir;
|
||||
private Path logDir;
|
||||
|
@ -115,8 +116,11 @@ public class TestWALObserver {
|
|||
TEST_UTIL.startMiniCluster(1);
|
||||
Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
|
||||
.makeQualified(new Path("/hbase"));
|
||||
Path hbaseWALRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
|
||||
.makeQualified(new Path("/hbaseLogRoot"));
|
||||
LOG.info("hbase.rootdir=" + hbaseRootDir);
|
||||
FSUtils.setRootDir(conf, hbaseRootDir);
|
||||
FSUtils.setWALRootDir(conf, hbaseWALRootDir);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -130,16 +134,20 @@ public class TestWALObserver {
|
|||
// this.cluster = TEST_UTIL.getDFSCluster();
|
||||
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
this.hbaseRootDir = FSUtils.getRootDir(conf);
|
||||
this.hbaseWALRootDir = FSUtils.getWALRootDir(conf);
|
||||
this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
|
||||
this.oldLogDir = new Path(this.hbaseRootDir,
|
||||
this.oldLogDir = new Path(this.hbaseWALRootDir,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
this.logDir = new Path(this.hbaseRootDir,
|
||||
this.logDir = new Path(this.hbaseWALRootDir,
|
||||
DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
|
||||
this.logName = HConstants.HREGION_LOGDIR_NAME;
|
||||
|
||||
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
|
||||
}
|
||||
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
|
||||
}
|
||||
this.wals = new WALFactory(conf, null, currentTest.getMethodName());
|
||||
}
|
||||
|
||||
|
@ -153,6 +161,7 @@ public class TestWALObserver {
|
|||
LOG.debug("details of failure to close wal factory.", exception);
|
||||
}
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -84,7 +84,7 @@ public class TestFilterFromRegionSide {
|
|||
}
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
|
||||
REGION = HBaseTestingUtility
|
||||
.createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd);
|
||||
.createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd);
|
||||
for(Put put:createPuts(ROWS, FAMILIES, QUALIFIERS, VALUE)){
|
||||
REGION.put(put);
|
||||
}
|
||||
|
|
|
@ -88,6 +88,8 @@ public class TestBlockReorder {
|
|||
private static final String host1 = "host1";
|
||||
private static final String host2 = "host2";
|
||||
private static final String host3 = "host3";
|
||||
private static Path rootDir;
|
||||
private static Path walRootDir;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -101,10 +103,14 @@ public class TestBlockReorder {
|
|||
conf = htu.getConfiguration();
|
||||
cluster = htu.getDFSCluster();
|
||||
dfs = (DistributedFileSystem) FileSystem.get(conf);
|
||||
rootDir = htu.createRootDir();
|
||||
walRootDir = htu.createWALRootDir();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAfterClass() throws Exception {
|
||||
dfs.delete(rootDir, true);
|
||||
dfs.delete(walRootDir, true);
|
||||
htu.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -277,7 +283,7 @@ public class TestBlockReorder {
|
|||
|
||||
// Now we need to find the log file, its locations, and look at it
|
||||
|
||||
String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
|
||||
String walDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
|
||||
"/" + targetRs.getServerName().toString()).toUri().getPath();
|
||||
|
||||
DistributedFileSystem mdfs = (DistributedFileSystem)
|
||||
|
@ -321,7 +327,7 @@ public class TestBlockReorder {
|
|||
p.add(sb, sb, sb);
|
||||
h.put(p);
|
||||
|
||||
DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
|
||||
DirectoryListing dl = dfs.getClient().listPaths(walDir, HdfsFileStatus.EMPTY_NAME);
|
||||
HdfsFileStatus[] hfs = dl.getPartialListing();
|
||||
|
||||
// As we wrote a put, we should have at least one log file.
|
||||
|
@ -329,8 +335,8 @@ public class TestBlockReorder {
|
|||
for (HdfsFileStatus hf : hfs) {
|
||||
// Because this is a live cluster, log files might get archived while we're processing
|
||||
try {
|
||||
LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
|
||||
String logFile = rootDir + "/" + hf.getLocalName();
|
||||
LOG.info("Log file found: " + hf.getLocalName() + " in " + walDir);
|
||||
String logFile = walDir + "/" + hf.getLocalName();
|
||||
FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
|
||||
|
||||
LOG.info("Checking log file: " + logFile);
|
||||
|
@ -457,7 +463,7 @@ public class TestBlockReorder {
|
|||
// Should be reordered, as we pretend to be a file name with a compliant stuff
|
||||
Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
|
||||
Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
|
||||
String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
|
||||
String pseudoLogFile = conf.get(HFileSystem.HBASE_WAL_DIR) + "/" +
|
||||
HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
|
||||
|
||||
// Check that it will be possible to extract a ServerName from our construction
|
||||
|
|
|
@ -59,7 +59,7 @@ public class TestSeekBeforeWithReverseScan {
|
|||
htd.addFamily(new HColumnDescriptor(cfName).setDataBlockEncoding(DataBlockEncoding.FAST_DIFF));
|
||||
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
|
||||
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
|
||||
region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
|
||||
region = HBaseTestingUtility.createRegionAndWAL(info, path, path, testUtil.getConfiguration(), htd);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.io.PrintStream;
|
|||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
|
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||
|
@ -67,16 +69,28 @@ import org.mockito.stubbing.Answer;
|
|||
public class TestWALPlayer {
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static MiniHBaseCluster cluster;
|
||||
private static Path rootDir;
|
||||
private static Path walRootDir;
|
||||
private static FileSystem fs;
|
||||
private static FileSystem walFs;
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
TEST_UTIL.setJobWithoutMRCluster();
|
||||
conf= TEST_UTIL.getConfiguration();
|
||||
rootDir = TEST_UTIL.createRootDir();
|
||||
walRootDir = TEST_UTIL.createWALRootDir();
|
||||
fs = FSUtils.getRootDirFileSystem(conf);
|
||||
walFs = FSUtils.getWALFileSystem(conf);
|
||||
cluster = TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
fs.delete(rootDir, true);
|
||||
walFs.delete(walRootDir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -108,7 +122,7 @@ public class TestWALPlayer {
|
|||
WAL log = cluster.getRegionServer(0).getWAL(null);
|
||||
log.rollWriter();
|
||||
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
|
||||
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
|
||||
.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
|
||||
|
||||
Configuration configuration= TEST_UTIL.getConfiguration();
|
||||
WALPlayer player = new WALPlayer(configuration);
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
|
|||
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
@ -65,6 +65,8 @@ public class TestWALRecordReader {
|
|||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
private static Path hbaseDir;
|
||||
private static FileSystem walFs;
|
||||
private static Path walRootDir;
|
||||
// visible for TestHLogRecordReader
|
||||
static final TableName tableName = TableName.valueOf(getName());
|
||||
private static final byte [] rowName = tableName.getName();
|
||||
|
@ -83,12 +85,9 @@ public class TestWALRecordReader {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
fs.delete(hbaseDir, true);
|
||||
walFs.delete(walRootDir, true);
|
||||
mvcc = new MultiVersionConcurrencyControl();
|
||||
FileStatus[] entries = fs.listStatus(hbaseDir);
|
||||
for (FileStatus dir : entries) {
|
||||
fs.delete(dir.getPath(), true);
|
||||
}
|
||||
|
||||
}
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -102,8 +101,10 @@ public class TestWALRecordReader {
|
|||
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
|
||||
hbaseDir = TEST_UTIL.createRootDir();
|
||||
|
||||
logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
|
||||
walRootDir = TEST_UTIL.createWALRootDir();
|
||||
walFs = FSUtils.getWALFileSystem(conf);
|
||||
logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
|
||||
htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor(family));
|
||||
|
@ -111,6 +112,8 @@ public class TestWALRecordReader {
|
|||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
fs.delete(hbaseDir, true);
|
||||
walFs.delete(walRootDir, true);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test the master filesystem in a local cluster
|
||||
*/
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestMasterFileSystemWithWALDir {
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupTest() throws Exception {
|
||||
UTIL.startMiniCluster(true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownTest() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsUriSetProperly() throws Exception {
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
MasterFileSystem fs = master.getMasterFileSystem();
|
||||
Path masterRoot = FSUtils.getRootDir(fs.getConfiguration());
|
||||
Path rootDir = FSUtils.getRootDir(fs.getFileSystem().getConf());
|
||||
assertEquals(masterRoot, rootDir);
|
||||
assertEquals(FSUtils.getWALRootDir(UTIL.getConfiguration()), fs.getWALRootDir());
|
||||
}
|
||||
}
|
|
@ -105,7 +105,7 @@ public class TestWALProcedureStoreOnHDFS {
|
|||
|
||||
public void tearDown() throws Exception {
|
||||
store.stop(false);
|
||||
UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
|
||||
UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
|
||||
|
||||
try {
|
||||
UTIL.shutdownMiniCluster();
|
||||
|
|
|
@ -344,7 +344,7 @@ public class TestHRegionServerBulkLoad {
|
|||
int millisToRun = 30000;
|
||||
int numScanners = 50;
|
||||
|
||||
UTIL.startMiniCluster(1);
|
||||
UTIL.startMiniCluster(1, false, true);
|
||||
try {
|
||||
WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
|
||||
FindBulkHBaseListener listener = new FindBulkHBaseListener();
|
||||
|
|
|
@ -75,7 +75,7 @@ public class TestCompactedHFilesDischarger {
|
|||
htd.addFamily(new HColumnDescriptor(fam));
|
||||
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
|
||||
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
|
||||
region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
|
||||
region = HBaseTestingUtility.createRegionAndWAL(info, path, path, testUtil.getConfiguration(), htd);
|
||||
rss = mock(RegionServerServices.class);
|
||||
List<Region> regions = new ArrayList<Region>();
|
||||
regions.add(region);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -88,6 +89,8 @@ public class TestFSHLog {
|
|||
protected static Configuration conf;
|
||||
protected static FileSystem fs;
|
||||
protected static Path dir;
|
||||
protected static Path rootDir;
|
||||
protected static Path walRootDir;
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@Rule
|
||||
|
@ -99,8 +102,10 @@ public class TestFSHLog {
|
|||
for (FileStatus dir : entries) {
|
||||
fs.delete(dir.getPath(), true);
|
||||
}
|
||||
final Path hbaseDir = TEST_UTIL.createRootDir();
|
||||
dir = new Path(hbaseDir, currentTest.getMethodName());
|
||||
rootDir = TEST_UTIL.createRootDir();
|
||||
walRootDir = TEST_UTIL.createWALRootDir();
|
||||
dir = new Path(walRootDir, currentTest.getMethodName());
|
||||
assertNotEquals(rootDir, walRootDir);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -133,6 +138,8 @@ public class TestFSHLog {
|
|||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
fs.delete(rootDir, true);
|
||||
fs.delete(walRootDir, true);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -144,7 +151,7 @@ public class TestFSHLog {
|
|||
// test to see whether the coprocessor is loaded or not.
|
||||
FSHLog log = null;
|
||||
try {
|
||||
log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
|
||||
log = new FSHLog(fs, walRootDir, dir.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
||||
WALCoprocessorHost host = log.getCoprocessorHost();
|
||||
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
|
||||
|
@ -195,7 +202,7 @@ public class TestFSHLog {
|
|||
FSHLog wal1 = null;
|
||||
FSHLog walMeta = null;
|
||||
try {
|
||||
wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
|
||||
wal1 = new FSHLog(fs, walRootDir, dir.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
||||
LOG.debug("Log obtained is: " + wal1);
|
||||
Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
|
||||
|
@ -205,7 +212,7 @@ public class TestFSHLog {
|
|||
assertTrue(comp.compare(p1, p1) == 0);
|
||||
// comparing with different filenum.
|
||||
assertTrue(comp.compare(p1, p2) < 0);
|
||||
walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
|
||||
walMeta = new FSHLog(fs, walRootDir, dir.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
|
||||
DefaultWALProvider.META_WAL_PROVIDER_ID);
|
||||
Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
|
||||
|
@ -253,7 +260,7 @@ public class TestFSHLog {
|
|||
LOG.debug("testFindMemStoresEligibleForFlush");
|
||||
Configuration conf1 = HBaseConfiguration.create(conf);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
||||
FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
|
||||
FSHLog wal = new FSHLog(fs, walRootDir, dir.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
|
||||
HTableDescriptor t1 =
|
||||
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
|
||||
|
@ -330,7 +337,7 @@ public class TestFSHLog {
|
|||
@Test(expected=IOException.class)
|
||||
public void testFailedToCreateWALIfParentRenamed() throws IOException {
|
||||
final String name = "testFailedToCreateWALIfParentRenamed";
|
||||
FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
FSHLog log = new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
conf, null, true, null, null);
|
||||
long filenum = System.currentTimeMillis();
|
||||
Path path = log.computeFilename(filenum);
|
||||
|
@ -359,13 +366,13 @@ public class TestFSHLog {
|
|||
final byte[] rowName = tableName.getName();
|
||||
final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("f"));
|
||||
HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
|
||||
HRegion r = HRegion.createHRegion(hri, rootDir,
|
||||
TEST_UTIL.getConfiguration(), htd);
|
||||
HRegion.closeHRegion(r);
|
||||
final int countPerFamily = 10;
|
||||
final MutableBoolean goslow = new MutableBoolean(false);
|
||||
// subclass and doctor a method.
|
||||
FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
|
||||
FSHLog wal = new FSHLog(FileSystem.get(conf), walRootDir,
|
||||
testName, conf) {
|
||||
@Override
|
||||
void atHeadOfRingBufferEventHandlerAppend() {
|
||||
|
@ -377,7 +384,7 @@ public class TestFSHLog {
|
|||
}
|
||||
};
|
||||
HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
|
||||
TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
|
||||
TEST_UTIL.getTestFileSystem(), rootDir, hri, htd, wal);
|
||||
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
try {
|
||||
List<Put> puts = null;
|
||||
|
@ -430,7 +437,7 @@ public class TestFSHLog {
|
|||
SecurityException, IllegalArgumentException, IllegalAccessException {
|
||||
final String name = "testSyncRunnerIndexOverflow";
|
||||
FSHLog log =
|
||||
new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||
new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||
null, true, null, null);
|
||||
try {
|
||||
Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
|
||||
|
@ -468,7 +475,7 @@ public class TestFSHLog {
|
|||
final CountDownLatch putFinished = new CountDownLatch(1);
|
||||
|
||||
try (FSHLog log =
|
||||
new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||
new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||
null, true, null, null)) {
|
||||
|
||||
log.registerWALActionsListener(new WALActionsListener.Base() {
|
||||
|
|
|
@ -71,7 +71,8 @@ public class TestLogRollAbort {
|
|||
|
||||
/* For the split-then-roll test */
|
||||
private static final Path HBASEDIR = new Path("/hbase");
|
||||
private static final Path OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
private static final Path HBASELOGDIR = new Path("/hbaselog");
|
||||
private static final Path OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
|
||||
// Need to override this setup so we can edit the config before it gets sent
|
||||
// to the HDFS & HBase cluster startup.
|
||||
|
@ -112,6 +113,7 @@ public class TestLogRollAbort {
|
|||
// disable region rebalancing (interferes with log watching)
|
||||
cluster.getMaster().balanceSwitch(false);
|
||||
FSUtils.setRootDir(conf, HBASEDIR);
|
||||
FSUtils.setWALRootDir(conf, HBASELOGDIR);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -183,7 +185,7 @@ public class TestLogRollAbort {
|
|||
public void testLogRollAfterSplitStart() throws IOException {
|
||||
LOG.info("Verify wal roll after split starts will fail.");
|
||||
String logName = "testLogRollAfterSplitStart";
|
||||
Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName));
|
||||
Path thisTestsDir = new Path(HBASELOGDIR, DefaultWALProvider.getWALDirectoryName(logName));
|
||||
final WALFactory wals = new WALFactory(conf, null, logName);
|
||||
|
||||
try {
|
||||
|
@ -220,7 +222,7 @@ public class TestLogRollAbort {
|
|||
LOG.debug("Renamed region directory: " + rsSplitDir);
|
||||
|
||||
LOG.debug("Processing the old log files.");
|
||||
WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals);
|
||||
WALSplitter.split(HBASELOGDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals);
|
||||
|
||||
LOG.debug("Trying to roll the WAL.");
|
||||
try {
|
||||
|
|
|
@ -20,14 +20,19 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -53,21 +58,27 @@ public class TestWALActionsListener {
|
|||
new HBaseTestingUtility();
|
||||
|
||||
private final static byte[] SOME_BYTES = Bytes.toBytes("t");
|
||||
private static FileSystem fs;
|
||||
private static Configuration conf;
|
||||
private static Path rootDir;
|
||||
private static Path walRootDir;
|
||||
private static FileSystem fs;
|
||||
private static FileSystem walFs;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt("hbase.regionserver.maxlogs", 5);
|
||||
fs = FileSystem.get(conf);
|
||||
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
|
||||
rootDir = TEST_UTIL.createRootDir();
|
||||
walRootDir = TEST_UTIL.createWALRootDir();
|
||||
fs = FSUtils.getRootDirFileSystem(conf);
|
||||
walFs = FSUtils.getWALFileSystem(conf);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME), true);
|
||||
fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME), true);
|
||||
fs.delete(rootDir, true);
|
||||
walFs.delete(new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME), true);
|
||||
walFs.delete(new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME), true);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
|
@ -121,6 +122,7 @@ public class TestWALReplay {
|
|||
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
private Path hbaseRootDir = null;
|
||||
private Path hbaseWALRootDir = null;
|
||||
private String logName;
|
||||
private Path oldLogDir;
|
||||
private Path logDir;
|
||||
|
@ -142,8 +144,12 @@ public class TestWALReplay {
|
|||
TEST_UTIL.startMiniCluster(3);
|
||||
Path hbaseRootDir =
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
|
||||
LOG.info("hbase.rootdir=" + hbaseRootDir);
|
||||
Path hbaseWALRootDir =
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaselog"));
|
||||
LOG.info(HConstants.HBASE_DIR + "=" + hbaseRootDir);
|
||||
LOG.info(HFileSystem.HBASE_WAL_DIR + "=" + hbaseWALRootDir);
|
||||
FSUtils.setRootDir(conf, hbaseRootDir);
|
||||
FSUtils.setWALRootDir(conf, hbaseWALRootDir);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -156,12 +162,16 @@ public class TestWALReplay {
|
|||
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
this.hbaseRootDir = FSUtils.getRootDir(this.conf);
|
||||
this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
this.hbaseWALRootDir = FSUtils.getWALRootDir(this.conf);
|
||||
this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
|
||||
this.logDir = new Path(this.hbaseRootDir, logName);
|
||||
this.logDir = new Path(this.hbaseWALRootDir, logName);
|
||||
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
|
||||
}
|
||||
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
|
||||
}
|
||||
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
this.wals = new WALFactory(conf, null, currentTest.getMethodName());
|
||||
|
@ -171,6 +181,7 @@ public class TestWALReplay {
|
|||
public void tearDown() throws Exception {
|
||||
this.wals.close();
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
|
||||
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -296,11 +307,11 @@ public class TestWALReplay {
|
|||
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
|
||||
Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
|
||||
HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
|
||||
HRegion region2 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region2);
|
||||
final byte [] rowName = tableName.getName();
|
||||
|
||||
|
@ -326,7 +337,7 @@ public class TestWALReplay {
|
|||
|
||||
WAL wal3 = createWAL(this.conf);
|
||||
try {
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal3);
|
||||
long seqid = region.getOpenSeqNum();
|
||||
// The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
|
||||
// When opened, this region would apply 6k edits, and increment the sequenceId by 1
|
||||
|
@ -358,13 +369,13 @@ public class TestWALReplay {
|
|||
final TableName tableName =
|
||||
TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
|
||||
final Path basedir = new Path(this.hbaseWALRootDir, tableName.getNameAsString());
|
||||
deleteDir(basedir);
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
|
||||
HRegion region2 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region2);
|
||||
WAL wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
|
||||
byte [] family = htd.getFamilies().iterator().next().getName();
|
||||
Path f = new Path(basedir, "hfile");
|
||||
|
@ -393,7 +404,7 @@ public class TestWALReplay {
|
|||
WAL wal2 = createWAL(newConf);
|
||||
|
||||
HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
|
||||
hbaseRootDir, hri, htd, wal2);
|
||||
hbaseWALRootDir, hri, htd, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid2 > -1);
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
|
||||
|
@ -424,14 +435,14 @@ public class TestWALReplay {
|
|||
final TableName tableName =
|
||||
TableName.valueOf("testCompactedBulkLoadedFiles");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
|
||||
final Path basedir = new Path(this.hbaseWALRootDir, tableName.getNameAsString());
|
||||
deleteDir(basedir);
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region2 = HRegion.createHRegion(hri,
|
||||
hbaseRootDir, this.conf, htd);
|
||||
hbaseWALRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region2);
|
||||
WAL wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
|
||||
// Add an edit so something in the WAL
|
||||
byte [] row = tableName.getName();
|
||||
|
@ -465,7 +476,7 @@ public class TestWALReplay {
|
|||
WAL wal2 = createWAL(newConf);
|
||||
|
||||
HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
|
||||
hbaseRootDir, hri, htd, wal2);
|
||||
hbaseWALRootDir, hri, htd, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid2 > -1);
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
|
||||
|
@ -495,19 +506,19 @@ public class TestWALReplay {
|
|||
final TableName tableName =
|
||||
TableName.valueOf("testReplayEditsWrittenViaHRegion");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final int countPerFamily = 10;
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region3 = HRegion.createHRegion(hri,
|
||||
hbaseRootDir, this.conf, htd);
|
||||
hbaseWALRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region3);
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
// of the families during the load of edits so its seqid is not same as
|
||||
// others to test we do right thing when different seqids.
|
||||
WAL wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
long seqid = region.getOpenSeqNum();
|
||||
boolean first = true;
|
||||
for (HColumnDescriptor hcd: htd.getFamilies()) {
|
||||
|
@ -530,7 +541,7 @@ public class TestWALReplay {
|
|||
wal.shutdown();
|
||||
runWALSplit(this.conf);
|
||||
WAL wal2 = createWAL(this.conf);
|
||||
HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
|
||||
HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseWALRootDir, hri, htd, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid + result.size() < seqid2);
|
||||
final Result result1b = region2.get(g);
|
||||
|
@ -605,19 +616,19 @@ public class TestWALReplay {
|
|||
final TableName tableName =
|
||||
TableName.valueOf("testReplayEditsWrittenViaHRegion");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final int countPerFamily = 10;
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region3 = HRegion.createHRegion(hri,
|
||||
hbaseRootDir, this.conf, htd);
|
||||
hbaseWALRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region3);
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
// of the families during the load of edits so its seqid is not same as
|
||||
// others to test we do right thing when different seqids.
|
||||
WAL wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
long seqid = region.getOpenSeqNum();
|
||||
for (HColumnDescriptor hcd: htd.getFamilies()) {
|
||||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
|
||||
|
@ -650,7 +661,7 @@ public class TestWALReplay {
|
|||
// Let us try to split and recover
|
||||
runWALSplit(this.conf);
|
||||
WAL wal2 = createWAL(this.conf);
|
||||
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
|
||||
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal2);
|
||||
long seqid2 = region2.getOpenSeqNum();
|
||||
assertTrue(seqid + result.size() < seqid2);
|
||||
|
||||
|
@ -690,10 +701,10 @@ public class TestWALReplay {
|
|||
final TableName tableName =
|
||||
TableName.valueOf("testReplayEditsAfterAbortingFlush");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
|
||||
HRegion region3 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
|
||||
region3.close();
|
||||
region3.getWAL().close();
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
|
@ -707,7 +718,7 @@ public class TestWALReplay {
|
|||
customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
|
||||
CustomStoreFlusher.class.getName());
|
||||
HRegion region =
|
||||
HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
|
||||
HRegion.openHRegion(this.hbaseWALRootDir, hri, htd, wal, customConf, rsServices, null);
|
||||
int writtenRowCount = 10;
|
||||
List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
|
||||
htd.getFamilies());
|
||||
|
@ -761,7 +772,7 @@ public class TestWALReplay {
|
|||
WAL wal2 = createWAL(this.conf);
|
||||
Mockito.doReturn(false).when(rsServices).isAborted();
|
||||
HRegion region2 =
|
||||
HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
|
||||
HRegion.openHRegion(this.hbaseWALRootDir, hri, htd, wal2, this.conf, rsServices, null);
|
||||
scanner = region2.getScanner(new Scan());
|
||||
assertEquals(writtenRowCount, getScannedCount(scanner));
|
||||
}
|
||||
|
@ -791,12 +802,12 @@ public class TestWALReplay {
|
|||
TableName.valueOf("testReplayEditsWrittenIntoWAL");
|
||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
|
||||
final Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
|
||||
HRegion region2 = HRegion.createHRegion(hri,
|
||||
hbaseRootDir, this.conf, htd);
|
||||
hbaseWALRootDir, this.conf, htd);
|
||||
HRegion.closeHRegion(region2);
|
||||
final WAL wal = createWAL(this.conf);
|
||||
final byte[] rowName = tableName.getName();
|
||||
|
@ -890,7 +901,7 @@ public class TestWALReplay {
|
|||
final TableName tableName = TableName.valueOf(currentTest.getMethodName());
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir =
|
||||
FSUtils.getTableDir(this.hbaseRootDir, tableName);
|
||||
FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final int countPerFamily = 10;
|
||||
|
@ -899,7 +910,7 @@ public class TestWALReplay {
|
|||
// Mock the WAL
|
||||
MockWAL wal = createMockWAL();
|
||||
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
for (HColumnDescriptor hcd : htd.getFamilies()) {
|
||||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
|
||||
}
|
||||
|
@ -920,10 +931,10 @@ public class TestWALReplay {
|
|||
FileStatus[] listStatus = wal.getFiles();
|
||||
assertNotNull(listStatus);
|
||||
assertTrue(listStatus.length > 0);
|
||||
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
|
||||
WALSplitter.splitLogFile(hbaseWALRootDir, listStatus[0],
|
||||
this.fs, this.conf, null, null, null, mode, wals);
|
||||
FileStatus[] listStatus1 = this.fs.listStatus(
|
||||
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
|
||||
new Path(FSUtils.getTableDir(hbaseWALRootDir, tableName), new Path(hri.getEncodedName(),
|
||||
"recovered.edits")), new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path p) {
|
||||
|
@ -951,17 +962,17 @@ public class TestWALReplay {
|
|||
IllegalAccessException {
|
||||
final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
|
||||
final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final int countPerFamily = 10;
|
||||
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
|
||||
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
|
||||
HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseWALRootDir, this.hbaseWALRootDir, this.conf, htd);
|
||||
Path regionDir = region1.getRegionFileSystem().getRegionDir();
|
||||
HBaseTestingUtility.closeRegionAndWAL(region1);
|
||||
|
||||
WAL wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
|
||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
for (HColumnDescriptor hcd : htd.getFamilies()) {
|
||||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
|
||||
}
|
||||
|
@ -1031,12 +1042,12 @@ public class TestWALReplay {
|
|||
HRegion region2;
|
||||
try {
|
||||
// log replay should fail due to the IOException, otherwise we may lose data.
|
||||
region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
|
||||
region2 = HRegion.openHRegion(conf, spyFs, hbaseWALRootDir, hri, htd, wal2);
|
||||
assertEquals(result.size(), region2.get(g).size());
|
||||
} catch (IOException e) {
|
||||
assertEquals("read over limit", e.getMessage());
|
||||
}
|
||||
region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
|
||||
region2 = HRegion.openHRegion(conf, fs, hbaseWALRootDir, hri, htd, wal2);
|
||||
assertEquals(result.size(), region2.get(g).size());
|
||||
}
|
||||
|
||||
|
@ -1047,11 +1058,11 @@ public class TestWALReplay {
|
|||
final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
|
||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
|
||||
final Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
|
||||
deleteDir(basedir);
|
||||
|
||||
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
|
||||
HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
|
||||
HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseWALRootDir, this.hbaseWALRootDir, this.conf, htd);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
final byte[] family = htd.getColumnFamilies()[0].getName();
|
||||
final byte[] rowName = tableName.getName();
|
||||
|
@ -1070,12 +1081,12 @@ public class TestWALReplay {
|
|||
first = fs.getFileStatus(smallFile);
|
||||
second = fs.getFileStatus(largeFile);
|
||||
}
|
||||
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null,
|
||||
WALSplitter.splitLogFile(hbaseWALRootDir, first, fs, conf, null, null, null,
|
||||
RecoveryMode.LOG_SPLITTING, wals);
|
||||
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null,
|
||||
WALSplitter.splitLogFile(hbaseWALRootDir, second, fs, conf, null, null, null,
|
||||
RecoveryMode.LOG_SPLITTING, wals);
|
||||
WAL wal = createWAL(this.conf);
|
||||
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
|
||||
region = HRegion.openHRegion(conf, this.fs, hbaseWALRootDir, hri, htd, wal);
|
||||
assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
|
||||
assertEquals(2, region.get(new Get(rowName)).size());
|
||||
}
|
||||
|
@ -1093,9 +1104,9 @@ public class TestWALReplay {
|
|||
static class MockWAL extends FSHLog {
|
||||
boolean doCompleteCacheFlush = false;
|
||||
|
||||
public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
|
||||
public MockWAL(FileSystem fs, Path walRootDir, String logName, Configuration conf)
|
||||
throws IOException {
|
||||
super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
||||
super(fs, walRootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1115,7 +1126,7 @@ public class TestWALReplay {
|
|||
}
|
||||
|
||||
private MockWAL createMockWAL() throws IOException {
|
||||
MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
|
||||
MockWAL wal = new MockWAL(fs, hbaseWALRootDir, logName, conf);
|
||||
// Set down maximum recovery so we dfsclient doesn't linger retrying something
|
||||
// long gone.
|
||||
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
|
||||
|
@ -1222,7 +1233,7 @@ public class TestWALReplay {
|
|||
*/
|
||||
private Path runWALSplit(final Configuration c) throws IOException {
|
||||
List<Path> splits = WALSplitter.split(
|
||||
hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
|
||||
hbaseWALRootDir, logDir, oldLogDir, FSUtils.getWALFileSystem(c), c, wals);
|
||||
// Split should generate only 1 file since there's only 1 region
|
||||
assertEquals("splits=" + splits, 1, splits.size());
|
||||
// Make sure the file exists
|
||||
|
@ -1237,7 +1248,7 @@ public class TestWALReplay {
|
|||
* @throws IOException
|
||||
*/
|
||||
private WAL createWAL(final Configuration c) throws IOException {
|
||||
FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
|
||||
FSHLog wal = new FSHLog(FSUtils.getWALFileSystem(c), hbaseWALRootDir, logName, c);
|
||||
// Set down maximum recovery so we dfsclient doesn't linger retrying something
|
||||
// long gone.
|
||||
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
|
||||
|
|
|
@ -39,8 +39,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -381,6 +382,54 @@ public class TestFSUtils {
|
|||
verifyFileInDirWithStoragePolicy("1772");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetWALRootDir() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
Configuration conf = htu.getConfiguration();
|
||||
Path p = new Path("file:///hbase/root");
|
||||
FSUtils.setWALRootDir(conf, p);
|
||||
assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWALRootDir() throws IOException {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
Configuration conf = htu.getConfiguration();
|
||||
Path root = new Path("file:///hbase/root");
|
||||
Path walRoot = new Path("file:///hbase/logroot");
|
||||
FSUtils.setRootDir(conf, root);
|
||||
assertEquals(FSUtils.getRootDir(conf), root);
|
||||
assertEquals(FSUtils.getWALRootDir(conf), root);
|
||||
FSUtils.setWALRootDir(conf, walRoot);
|
||||
assertEquals(FSUtils.getWALRootDir(conf), walRoot);
|
||||
}
|
||||
|
||||
@Test(expected=IllegalStateException.class)
|
||||
public void testGetWALRootDirIllegalWALDir() throws IOException {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
Configuration conf = htu.getConfiguration();
|
||||
Path root = new Path("file:///hbase/root");
|
||||
Path invalidWALDir = new Path("file:///hbase/root/logroot");
|
||||
FSUtils.setRootDir(conf, root);
|
||||
FSUtils.setWALRootDir(conf, invalidWALDir);
|
||||
FSUtils.getWALRootDir(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveWALRootPath() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
Configuration conf = htu.getConfiguration();
|
||||
FSUtils.setRootDir(conf, new Path("file:///user/hbase"));
|
||||
Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile");
|
||||
Path tmpFile = new Path("file:///test/testfile");
|
||||
assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile");
|
||||
assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
|
||||
FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
|
||||
assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString());
|
||||
Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog");
|
||||
assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog");
|
||||
}
|
||||
|
||||
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
|
||||
assertTrue(fileSys.exists(name));
|
||||
assertTrue(fileSys.delete(name, true));
|
||||
|
|
|
@ -102,7 +102,7 @@ public class IOTestProvider implements WALProvider {
|
|||
providerId = DEFAULT_PROVIDER_ID;
|
||||
}
|
||||
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
|
||||
log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf),
|
||||
log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
|
||||
DefaultWALProvider.getWALDirectoryName(factory.factoryId),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
|
||||
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
|
|
|
@ -66,6 +66,8 @@ public class TestDefaultWALProvider {
|
|||
|
||||
protected static Configuration conf;
|
||||
protected static FileSystem fs;
|
||||
protected static FileSystem walFs;
|
||||
protected static Path walRootDir;
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
protected MultiVersionConcurrencyControl mvcc;
|
||||
|
||||
|
@ -79,6 +81,7 @@ public class TestDefaultWALProvider {
|
|||
for (FileStatus dir : entries) {
|
||||
fs.delete(dir.getPath(), true);
|
||||
}
|
||||
walFs.delete(walRootDir, true);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -104,13 +107,15 @@ public class TestDefaultWALProvider {
|
|||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
|
||||
// Set up a working space for our tests.
|
||||
TEST_UTIL.createRootDir();
|
||||
walRootDir = TEST_UTIL.createWALRootDir();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
fs = FSUtils.getRootDirFileSystem(conf);
|
||||
walFs = FSUtils.getWALFileSystem(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
walFs.delete(walRootDir, true);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -121,13 +126,12 @@ public class TestDefaultWALProvider {
|
|||
@Test
|
||||
public void testGetServerNameFromWALDirectoryName() throws IOException {
|
||||
ServerName sn = ServerName.valueOf("hn", 450, 1398);
|
||||
String hl = FSUtils.getRootDir(conf) + "/" +
|
||||
String hl = walRootDir + "/" +
|
||||
DefaultWALProvider.getWALDirectoryName(sn.toString());
|
||||
|
||||
// Must not throw exception
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
FSUtils.getRootDir(conf).toUri().toString()));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, walRootDir.toUri().toString()));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, ""));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, " "));
|
||||
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl));
|
||||
|
@ -136,7 +140,7 @@ public class TestDefaultWALProvider {
|
|||
|
||||
final String wals = "/WALs/";
|
||||
ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
|
||||
walRootDir.toUri().toString() + wals + sn +
|
||||
"/localhost%2C32984%2C1343316388997.1343316390417");
|
||||
assertEquals("standard", sn, parsed);
|
||||
|
||||
|
@ -144,7 +148,7 @@ public class TestDefaultWALProvider {
|
|||
assertEquals("subdir", sn, parsed);
|
||||
|
||||
parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
|
||||
FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
|
||||
walRootDir.toUri().toString() + wals + sn +
|
||||
"-splitting/localhost%3A57020.1340474893931");
|
||||
assertEquals("split", sn, parsed);
|
||||
}
|
||||
|
|
|
@ -84,6 +84,7 @@ public class TestWALFactory {
|
|||
private static MiniDFSCluster cluster;
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
protected static Path hbaseDir;
|
||||
protected static Path hbaseWALDir;
|
||||
|
||||
protected FileSystem fs;
|
||||
protected Path dir;
|
||||
|
@ -142,6 +143,7 @@ public class TestWALFactory {
|
|||
cluster = TEST_UTIL.getDFSCluster();
|
||||
|
||||
hbaseDir = TEST_UTIL.createRootDir();
|
||||
hbaseWALDir = TEST_UTIL.createWALRootDir();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -164,12 +166,12 @@ public class TestWALFactory {
|
|||
final TableName tableName = TableName.valueOf(currentTest.getMethodName());
|
||||
final byte [] rowName = tableName.getName();
|
||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
|
||||
final Path logdir = new Path(hbaseDir,
|
||||
final Path logdir = new Path(hbaseWALDir,
|
||||
DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
|
||||
Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final int howmany = 3;
|
||||
HRegionInfo[] infos = new HRegionInfo[3];
|
||||
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
|
||||
Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
|
||||
fs.mkdirs(tabledir);
|
||||
for(int i = 0; i < howmany; i++) {
|
||||
infos[i] = new HRegionInfo(tableName,
|
||||
|
@ -203,7 +205,7 @@ public class TestWALFactory {
|
|||
}
|
||||
}
|
||||
wals.shutdown();
|
||||
List<Path> splits = WALSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf, wals);
|
||||
List<Path> splits = WALSplitter.split(hbaseWALDir, logdir, oldLogDir, fs, conf, wals);
|
||||
verifySplits(splits, howmany);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestWALRootDir {
|
||||
private static final Log LOG = LogFactory.getLog(TestWALRootDir.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
private static FileSystem walFs;
|
||||
static final TableName tableName = TableName.valueOf("TestWALWALDir");
|
||||
private static final byte [] rowName = Bytes.toBytes("row");
|
||||
private static final byte [] family = Bytes.toBytes("column");
|
||||
private static HTableDescriptor htd;
|
||||
private static Path walRootDir;
|
||||
private static Path rootDir;
|
||||
private static WALFactory wals;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
TEST_UTIL.startMiniDFSCluster(1);
|
||||
rootDir = TEST_UTIL.createRootDir();
|
||||
walRootDir = TEST_UTIL.createWALRootDir();
|
||||
fs = FSUtils.getRootDirFileSystem(conf);
|
||||
walFs = FSUtils.getWALFileSystem(conf);
|
||||
htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor(family));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
cleanup();
|
||||
TEST_UTIL.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWALRootDir() throws Exception {
|
||||
HRegionInfo regionInfo = new HRegionInfo(tableName);
|
||||
wals = new WALFactory(conf, null, "testWALRootDir");
|
||||
WAL log = wals.getWAL(regionInfo.getEncodedNameAsBytes(), regionInfo.getTable().getNamespace());
|
||||
|
||||
assertEquals(1, getWALFiles(walFs, walRootDir).size());
|
||||
byte [] value = Bytes.toBytes("value");
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
|
||||
System.currentTimeMillis(), value));
|
||||
long txid = log.append(htd,regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true);
|
||||
log.sync(txid);
|
||||
assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size());
|
||||
log.rollWriter();
|
||||
//Create 1 more WAL
|
||||
assertEquals(2, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
|
||||
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
|
||||
System.currentTimeMillis(), value));
|
||||
txid = log.append(htd, regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit, true);
|
||||
log.sync(txid);
|
||||
log.rollWriter();
|
||||
log.shutdown();
|
||||
|
||||
assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
|
||||
}
|
||||
|
||||
protected WALKey getWalKey(final long time, HRegionInfo hri, final long startPoint) {
|
||||
return new WALKey(hri.getEncodedNameAsBytes(), tableName, time, new MultiVersionConcurrencyControl(startPoint));
|
||||
}
|
||||
|
||||
private List<FileStatus> getWALFiles(FileSystem fs, Path dir)
|
||||
throws IOException {
|
||||
List<FileStatus> result = new ArrayList<FileStatus>();
|
||||
LOG.debug("Scanning " + dir.toString() + " for WAL files");
|
||||
|
||||
FileStatus[] files = fs.listStatus(dir);
|
||||
if (files == null) return Collections.emptyList();
|
||||
for (FileStatus file : files) {
|
||||
if (file.isDirectory()) {
|
||||
// recurse into sub directories
|
||||
result.addAll(getWALFiles(fs, file.getPath()));
|
||||
} else {
|
||||
String name = file.getPath().toString();
|
||||
if (!name.startsWith(".")) {
|
||||
result.add(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void cleanup() throws Exception{
|
||||
walFs.delete(walRootDir, true);
|
||||
fs.delete(rootDir, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -118,6 +118,7 @@ public class TestWALSplit {
|
|||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private Path HBASEDIR;
|
||||
private Path HBASELOGDIR;
|
||||
private Path WALDIR;
|
||||
private Path OLDLOGDIR;
|
||||
private Path CORRUPTDIR;
|
||||
|
@ -180,8 +181,9 @@ public class TestWALSplit {
|
|||
LOG.info("Cleaning up cluster for new test.");
|
||||
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||
HBASEDIR = TEST_UTIL.createRootDir();
|
||||
OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
|
||||
HBASELOGDIR = TEST_UTIL.createWALRootDir();
|
||||
OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
|
||||
TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
|
||||
REGIONS.clear();
|
||||
Collections.addAll(REGIONS, "bbb", "ccc");
|
||||
|
@ -189,7 +191,7 @@ public class TestWALSplit {
|
|||
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||
wals = new WALFactory(conf, null, name.getMethodName());
|
||||
WALDIR = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
|
||||
WALDIR = new Path(HBASELOGDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
|
||||
//fs.mkdirs(WALDIR);
|
||||
}
|
||||
|
||||
|
@ -205,6 +207,7 @@ public class TestWALSplit {
|
|||
} finally {
|
||||
wals = null;
|
||||
fs.delete(HBASEDIR, true);
|
||||
fs.delete(HBASELOGDIR, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1111,7 +1114,7 @@ public class TestWALSplit {
|
|||
useDifferentDFSClient();
|
||||
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
||||
|
||||
final Path corruptDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||
assertEquals(1, fs.listStatus(corruptDir).length);
|
||||
}
|
||||
|
||||
|
|
|
@ -346,7 +346,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
if (verify) {
|
||||
LOG.info("verifying written log entries.");
|
||||
Path dir = new Path(FSUtils.getRootDir(getConf()),
|
||||
Path dir = new Path(FSUtils.getWALRootDir(getConf()),
|
||||
DefaultWALProvider.getWALDirectoryName("wals"));
|
||||
long editCount = 0;
|
||||
FileStatus [] fsss = fs.listStatus(dir);
|
||||
|
|
Loading…
Reference in New Issue