HBASE-17437 Support specifying a WAL directory outside of the root directory (Yishan Yang and Zach York)

Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
Zach York 2017-01-11 12:49:20 -08:00 committed by Enis Soztutar
parent bd7c9581f2
commit ae21797305
40 changed files with 605 additions and 146 deletions

View File

@ -1203,6 +1203,13 @@ possible configurations would overwhelm and obscure the important.
When master starts, it creates the rootdir with this permissions or sets the permissions
if it does not match.</description>
</property>
<property>
<name>hbase.wal.dir.perms</name>
<value>700</value>
<description>FS Permissions for the root WAL directory in a secure(kerberos) setup.
When master starts, it creates the WAL dir with this permissions or sets the permissions
if it does not match.</description>
</property>
<property>
<name>hbase.data.umask.enable</name>
<value>false</value>

View File

@ -123,7 +123,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final LeaseRecovery leaseRecovery;
private final Configuration conf;
private final FileSystem fs;
private final Path logDir;
private final Path walDir;
private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
private final AtomicBoolean loading = new AtomicBoolean(true);
@ -183,11 +183,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
final LeaseRecovery leaseRecovery) {
this.fs = fs;
this.conf = conf;
this.logDir = logDir;
this.walDir = walDir;
this.leaseRecovery = leaseRecovery;
}
@ -1119,8 +1119,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
// ==========================================================================
// FileSystem Log Files helpers
// ==========================================================================
public Path getLogDir() {
return this.logDir;
public Path getWALDir() {
return this.walDir;
}
public FileSystem getFileSystem() {
@ -1128,7 +1128,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
protected Path getLogFilePath(final long logId) throws IOException {
return new Path(logDir, String.format("state-%020d.log", logId));
return new Path(walDir, String.format("state-%020d.log", logId));
}
private static long getLogIdFromName(final String name) {
@ -1157,7 +1157,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private FileStatus[] getLogFiles() throws IOException {
try {
FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
return files;
} catch (FileNotFoundException e) {

View File

@ -59,8 +59,8 @@ public class ProcedureTestingUtility {
}
public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
final Path logDir) throws IOException {
return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
final Path walDir) throws IOException {
return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op

View File

@ -223,10 +223,10 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
public void tearDownProcedureStore() {
store.stop(false);
try {
store.getFileSystem().delete(store.getLogDir(), true);
store.getFileSystem().delete(store.getWALDir(), true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
+ "disk space. Location: " + store.getLogDir().toString());
+ "disk space. Location: " + store.getWALDir().toString());
System.err.println(e.toString());
}
}

View File

@ -107,10 +107,10 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
private void tearDownProcedureStore() {
store.stop(false);
try {
store.getFileSystem().delete(store.getLogDir(), true);
store.getFileSystem().delete(store.getWALDir(), true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
+ "disk space. Location: " + store.getLogDir().toString());
+ "disk space. Location: " + store.getWALDir().toString());
e.printStackTrace();
}
}

View File

@ -31,6 +31,7 @@ import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -57,16 +58,19 @@ import org.apache.hadoop.util.Progressable;
import edu.umd.cs.findbugs.annotations.Nullable;
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
/**
* An encapsulation for the FileSystem object that hbase uses to access
* data. This class allows the flexibility of using
* data. This class allows the flexibility of using
* separate filesystem objects for reading and writing hfiles and wals.
* In future, if we want to make wals be in a different filesystem,
* this is the place to make it happen.
*/
public class HFileSystem extends FilterFileSystem {
public static final Log LOG = LogFactory.getLog(HFileSystem.class);
/** Parameter name for HBase WAL directory */
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
private final FileSystem noChecksumFs; // read hfile data from storage
private final boolean useHBaseChecksum;
private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
@ -86,7 +90,7 @@ public class HFileSystem extends FilterFileSystem {
// the underlying filesystem that has checksums switched on.
this.fs = FileSystem.get(conf);
this.useHBaseChecksum = useHBaseChecksum;
fs.initialize(getDefaultUri(conf), conf);
// disable checksum verification for local fileSystem, see HBASE-11218

View File

@ -45,17 +45,17 @@ public class WALLink extends FileLink {
*/
public WALLink(final Configuration conf,
final String serverName, final String logName) throws IOException {
this(FSUtils.getRootDir(conf), serverName, logName);
this(FSUtils.getWALRootDir(conf), serverName, logName);
}
/**
* @param rootDir Path to the root directory where hbase files are stored
* @param walRootDir Path to the root directory where hbase files are stored
* @param serverName Region Server owner of the log
* @param logName WAL file name
*/
public WALLink(final Path rootDir, final String serverName, final String logName) {
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
public WALLink(final Path walRootDir, final String serverName, final String logName) {
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final Path logDir = new Path(new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
}

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -511,15 +512,15 @@ public class AssignmentManager {
Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
if (!queuedDeadServers.isEmpty()) {
Configuration conf = server.getConfiguration();
Path rootdir = FSUtils.getRootDir(conf);
FileSystem fs = rootdir.getFileSystem(conf);
Path walRootDir = FSUtils.getWALRootDir(conf);
FileSystem walFs = FSUtils.getWALFileSystem(conf);
for (ServerName serverName: queuedDeadServers) {
// In the case of a clean exit, the shutdown handler would have presplit any WALs and
// removed empty directories.
Path logDir = new Path(rootdir,
Path walDir = new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
if (checkWals(fs, logDir) || checkWals(fs, splitDir)) {
Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) {
LOG.debug("Found queued dead server " + serverName);
failover = true;
break;

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -164,6 +165,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.ZKDataMigrator;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
@ -1084,10 +1086,10 @@ public class HMaster extends HRegionServer implements MasterServices {
private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final Path logDir = new Path(fileSystemManager.getRootDir(),
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,

View File

@ -55,16 +55,26 @@ import org.apache.hadoop.ipc.RemoteException;
public class MasterFileSystem {
private static final Log LOG = LogFactory.getLog(MasterFileSystem.class);
/** Parameter name for HBase instance root directory permission*/
public static final String HBASE_DIR_PERMS = "hbase.rootdir.perms";
/** Parameter name for HBase WAL directory permission*/
public static final String HBASE_WAL_DIR_PERMS = "hbase.wal.dir.perms";
// HBase configuration
private final Configuration conf;
// Persisted unique cluster ID
private ClusterId clusterId;
// Keep around for convenience.
private final FileSystem fs;
// root hbase directory on the FS
// Keep around for convenience.
private final FileSystem walFs;
// root log directory on the FS
private final Path rootdir;
// hbase temp directory used for table construction and deletion
private final Path tempdir;
// root hbase directory on the FS
private final Path walRootDir;
/*
@ -99,6 +109,10 @@ public class MasterFileSystem {
// Cover both bases, the old way of setting default fs and the new.
// We're supposed to run on 0.20 and 0.21 anyways.
this.fs = this.rootdir.getFileSystem(conf);
this.walRootDir = FSUtils.getWALRootDir(conf);
this.walFs = FSUtils.getWALFileSystem(conf);
FSUtils.setFsDefault(conf, new Path(this.walFs.getUri()));
walFs.setConf(conf);
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
@ -123,20 +137,33 @@ public class MasterFileSystem {
final String[] protectedSubDirs = new String[] {
HConstants.BASE_NAMESPACE_DIR,
HConstants.HFILE_ARCHIVE_DIRECTORY,
HConstants.HREGION_LOGDIR_NAME,
HConstants.HREGION_OLDLOGDIR_NAME,
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR,
HConstants.CORRUPT_DIR_NAME,
HConstants.HBCK_SIDELINEDIR_NAME,
MobConstants.MOB_DIR_NAME
};
final String[] protectedSubLogDirs = new String[] {
HConstants.HREGION_LOGDIR_NAME,
HConstants.HREGION_OLDLOGDIR_NAME,
HConstants.CORRUPT_DIR_NAME,
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR
};
// check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs);
// Check the directories under rootdir.
checkTempDir(this.tempdir, conf, this.fs);
for (String subDir : protectedSubDirs) {
checkSubDir(new Path(this.rootdir, subDir));
checkSubDir(new Path(this.rootdir, subDir), HBASE_DIR_PERMS);
}
final String perms;
if (!this.walRootDir.equals(this.rootdir)) {
perms = HBASE_WAL_DIR_PERMS;
} else {
perms = HBASE_DIR_PERMS;
}
for (String subDir : protectedSubLogDirs) {
checkSubDir(new Path(this.walRootDir, subDir), perms);
}
checkStagingDir();
@ -165,6 +192,8 @@ public class MasterFileSystem {
return this.fs;
}
protected FileSystem getWALFileSystem() { return this.walFs; }
public Configuration getConfiguration() {
return this.conf;
}
@ -176,6 +205,11 @@ public class MasterFileSystem {
return this.rootdir;
}
/**
* @return HBase root log dir.
*/
public Path getWALRootDir() { return this.walRootDir; }
/**
* @return HBase temp dir.
*/
@ -296,7 +330,9 @@ public class MasterFileSystem {
* @param p
* @throws IOException
*/
private void checkSubDir(final Path p) throws IOException {
private void checkSubDir(final Path p, final String dirPermsConfName) throws IOException {
FileSystem fs = p.getFileSystem(conf);
FsPermission dirPerms = new FsPermission(conf.get(dirPermsConfName, "700"));
if (!fs.exists(p)) {
if (isSecurityEnabled) {
if (!fs.mkdirs(p, secureRootSubDirPerms)) {
@ -309,14 +345,14 @@ public class MasterFileSystem {
}
}
else {
if (isSecurityEnabled && !secureRootSubDirPerms.equals(fs.getFileStatus(p).getPermission())) {
if (isSecurityEnabled && !dirPerms.equals(fs.getFileStatus(p).getPermission())) {
// check whether the permission match
LOG.warn("Found HBase directory permissions NOT matching expected permissions for "
+ p.toString() + " permissions=" + fs.getFileStatus(p).getPermission()
+ ", expecting " + secureRootSubDirPerms + ". Automatically setting the permissions. "
+ "You can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
+ ", expecting " + dirPerms + ". Automatically setting the permissions. "
+ "You can change the permissions by setting \"" + dirPermsConfName + "\" in hbase-site.xml "
+ "and restarting the master");
fs.setPermission(p, secureRootSubDirPerms);
fs.setPermission(p, dirPerms);
}
}
}

View File

@ -90,8 +90,8 @@ public class MasterWalManager {
private volatile boolean fsOk = true;
public MasterWalManager(MasterServices services) throws IOException {
this(services.getConfiguration(), services.getMasterFileSystem().getFileSystem(),
services.getMasterFileSystem().getRootDir(), services);
this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(),
services.getMasterFileSystem().getWALRootDir(), services);
}
public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, MasterServices services)

View File

@ -262,7 +262,7 @@ public class SplitLogManager {
// recover-lease is done. totalSize will be under in most cases and the
// metrics that it drives will also be under-reported.
totalSize += lf.getLen();
String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
if (!enqueueSplitTask(pathToLog, batch)) {
throw new IOException("duplicate log split scheduled for " + lf.getPath());
}

View File

@ -311,6 +311,7 @@ public class HRegionServer extends HasThread implements
// If false, the file system has become unavailable
protected volatile boolean fsOk;
protected HFileSystem fs;
protected HFileSystem walFs;
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
@ -332,6 +333,7 @@ public class HRegionServer extends HasThread implements
protected final Configuration conf;
private Path rootDir;
private Path walRootDir;
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -643,13 +645,16 @@ public class HRegionServer extends HasThread implements
}
private void initializeFileSystem() throws IOException {
// Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
// checksum verification enabled, then automatically switch off hdfs checksum verification.
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
this.walRootDir = FSUtils.getWALRootDir(this.conf);
// Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
// underlying hadoop hdfs accessors will be going against wrong filesystem
// (unless all is set to defaults).
FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
// Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
// checksum verification enabled, then automatically switch off hdfs checksum verification.
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
this.fs = new HFileSystem(this.conf, useHBaseChecksum);
this.rootDir = FSUtils.getRootDir(this.conf);
this.tableDescriptors = getFsTableDescriptors();
@ -1726,19 +1731,19 @@ public class HRegionServer extends HasThread implements
*/
private WALFactory setupWALAndReplication() throws IOException {
// TODO Replication make assumptions here based on the default filesystem impl
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Path logdir = new Path(rootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
if (this.fs.exists(logdir)) {
Path logDir = new Path(walRootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
if (this.walFs.exists(logDir)) {
throw new RegionServerRunningException("Region server has already " +
"created directory at " + this.serverName.toString());
}
// Instantiate replication manager if replication enabled. Pass it the
// log directories.
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
// listeners the wal factory will add to wals it creates.
final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
@ -2719,6 +2724,20 @@ public class HRegionServer extends HasThread implements
return fs;
}
/**
* @return Return the walRootDir.
*/
protected Path getWALRootDir() {
return walRootDir;
}
/**
* @return Return the walFs.
*/
protected FileSystem getWALFileSystem() {
return walFs;
}
@Override
public String toString() {
return getServerName().toString();
@ -2785,7 +2804,7 @@ public class HRegionServer extends HasThread implements
* Load the replication service objects, if any
*/
static private void createNewReplicationInstance(Configuration conf,
HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
if ((server instanceof HMaster) &&
(!BaseLoadBalancer.userTablesOnMaster(conf))) {
@ -2805,21 +2824,21 @@ public class HRegionServer extends HasThread implements
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
conf, server, walFs, walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSourceHandler;
} else {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
conf, server, walFs, walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService)
newReplicationInstance(sinkClassname,
conf, server, fs, logDir, oldLogDir);
conf, server, walFs, walDir, oldWALDir);
}
}
static private ReplicationService newReplicationInstance(String classname,
Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
Path oldLogDir) throws IOException{
Class<?> clazz = null;
@ -2833,7 +2852,7 @@ public class HRegionServer extends HasThread implements
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
service.initialize(server, fs, logDir, oldLogDir);
service.initialize(server, walFs, logDir, oldLogDir);
return service;
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALSplitter;
@ -88,11 +89,11 @@ public class SplitLogWorker implements Runnable {
this(server, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
Path rootdir;
Path walDir;
FileSystem fs;
try {
rootdir = FSUtils.getRootDir(conf);
fs = rootdir.getFileSystem(conf);
walDir = FSUtils.getWALRootDir(conf);
fs = walDir.getFileSystem(conf);
} catch (IOException e) {
LOG.warn("could not find root dir or fs", e);
return Status.RESIGNED;
@ -101,7 +102,7 @@ public class SplitLogWorker implements Runnable {
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
return Status.PREEMPTED;
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@ -780,7 +781,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
+ Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
private static void split(final Configuration conf, final Path p) throws IOException {
FileSystem fs = FileSystem.get(conf);
FileSystem fs = FSUtils.getWALFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
@ -788,7 +789,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
throw new IOException(p + " is not a directory");
}
final Path baseDir = FSUtils.getRootDir(conf);
final Path baseDir = FSUtils.getWALRootDir(conf);
final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@ -962,9 +963,9 @@ public class ReplicationSource extends Thread
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path rootDir = FSUtils.getRootDir(conf);
final Path walDir = FSUtils.getWALRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
final Path deadRsDirectory = new Path(rootDir,
final Path deadRsDirectory = new Path(walDir,
AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()),
new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT),
@ -984,7 +985,7 @@ public class ReplicationSource extends Thread
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
if (stopper instanceof ReplicationSyncUp.DummyServer) {
// N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
// area rather than to the wal area for a particular region server.
FileStatus[] rss = fs.listStatus(manager.getLogDir());
for (FileStatus rs : rss) {

View File

@ -76,7 +76,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
Replication replication;
ReplicationSourceManager manager;
FileSystem fs;
Path oldLogDir, logDir, rootDir;
Path oldLogDir, logDir, walRootDir;
ZooKeeperWatcher zkw;
Abortable abortable = new Abortable() {
@ -94,10 +94,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable,
true);
rootDir = FSUtils.getRootDir(conf);
fs = FileSystem.get(conf);
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
walRootDir = FSUtils.getWALRootDir(conf);
fs = FSUtils.getWALFileSystem(conf);
oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
System.out.println("Start Replication Server start");
replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@ -99,6 +100,9 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
/**
* Utility methods for interacting with the underlying file system.
*/
@ -106,6 +110,9 @@ import org.apache.hadoop.util.StringUtils;
public abstract class FSUtils {
private static final Log LOG = LogFactory.getLog(FSUtils.class);
/** Parameter name for HBase WAL directory */
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
/** Full access permissions (starting point for a umask) */
public static final String FULL_RWX_PERMISSIONS = "777";
private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
@ -1013,22 +1020,22 @@ public abstract class FSUtils {
return root;
} catch (URISyntaxException e) {
IOException io = new IOException("Root directory path is not a valid " +
"URI -- check your " + HConstants.HBASE_DIR + " configuration");
"URI -- check your " + HBASE_DIR + " configuration");
io.initCause(e);
throw io;
}
}
/**
* Checks for the presence of the root path (using the provided conf object) in the given path. If
* Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
* it exists, this method removes it and returns the String representation of remaining relative path.
* @param path
* @param conf
* @return String representation of the remaining relative path
* @throws IOException
*/
public static String removeRootPath(Path path, final Configuration conf) throws IOException {
Path root = FSUtils.getRootDir(conf);
public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
Path root = getWALRootDir(conf);
String pathStr = path.toString();
// check that the path is absolute... it has the root path in it.
if (!pathStr.startsWith(root.toString())) return pathStr;
@ -1075,24 +1082,65 @@ public abstract class FSUtils {
/**
* @param c configuration
* @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
* @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from
* configuration as a qualified Path.
* @throws IOException e
*/
public static Path getRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HConstants.HBASE_DIR));
Path p = new Path(c.get(HBASE_DIR));
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
public static void setRootDir(final Configuration c, final Path root) throws IOException {
c.set(HConstants.HBASE_DIR, root.toString());
c.set(HBASE_DIR, root.toString());
}
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
}
public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
Path p = getRootDir(c);
return p.getFileSystem(c);
}
/**
* @param c configuration
* @return {@link Path} to hbase log root directory: i.e. {@value #HBASE_WAL_DIR} from
* configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR}
* @throws IOException e
*/
public static Path getWALRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HBASE_DIR)));
if (!isValidWALRootDir(p, c)) {
return FSUtils.getRootDir(c);
}
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
@VisibleForTesting
public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
c.set(HBASE_WAL_DIR, root.toString());
}
public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
Path p = getWALRootDir(c);
return p.getFileSystem(c);
}
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
Path rootDir = FSUtils.getRootDir(c);
if (walDir != rootDir) {
if (walDir.toString().startsWith(rootDir.toString() + "/")) {
throw new IllegalStateException("Illegal WAL directory specified. " +
"WAL directories are not permitted to be under the root directory if set.");
}
}
return true;
}
/**
* Checks if meta region exists
*

View File

@ -40,8 +40,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import com.google.common.annotations.VisibleForTesting;
/**
* Base class of a WAL Provider that returns a single thread safe WAL that writes to HDFS. By
* default, this implementation picks a directory in HDFS based on a combination of
* Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
* default, this implementation picks a directory in Hadoop FS based on a combination of
* <ul>
* <li>the HBase root directory
* <li>HConstants.HREGION_LOGDIR_NAME

View File

@ -56,7 +56,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
@Override
protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(FileSystem.get(conf), FSUtils.getRootDir(conf),
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup.next());

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -63,7 +64,7 @@ class DisabledWALProvider implements WALProvider {
if (null == providerId) {
providerId = "defaultDisabled";
}
disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null);
disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null);
}
@Override

View File

@ -75,7 +75,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
@Override
protected FSHLog createWAL() throws IOException {
return new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -432,7 +433,7 @@ public class WALSplitter {
*/
public static void finishSplitLogFile(String logfile,
Configuration conf) throws IOException {
Path rootdir = FSUtils.getRootDir(conf);
Path rootdir = FSUtils.getWALRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logPath;
if (FSUtils.isStartingWithPath(rootdir, logfile)) {
@ -475,7 +476,7 @@ public class WALSplitter {
final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final Path corruptDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to "
+ corruptDir);

View File

@ -867,6 +867,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return startMiniCluster(1, 1);
}
/**
* Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
* @throws Exception
* @return Mini hbase cluster instance created.
* @see {@link #shutdownMiniDFSCluster()}
*/
public MiniHBaseCluster startMiniCluster(boolean withWALDir) throws Exception {
return startMiniCluster(1, 1, 1, null, null, null, false, withWALDir);
}
/**
* Start up a minicluster of hbase, dfs, and zookeeper.
* Set the <code>create</code> flag to create root or data directory path or not
@ -898,6 +908,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return startMiniCluster(1, numSlaves, false);
}
public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create, boolean withWALDir)
throws Exception {
return startMiniCluster(1, numSlaves, numSlaves, null, null, null, create, withWALDir);
}
/**
* Start minicluster. Whether to create a new root or data dir path even if such a path
* has been created earlier is decided based on flag <code>create</code>
@ -927,7 +942,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
final int numSlaves, final String[] dataNodeHosts, boolean create)
throws Exception {
return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
null, null, create);
null, null, create, false);
}
/**
@ -1010,7 +1025,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws Exception {
return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
masterClass, regionserverClass, false);
masterClass, regionserverClass, false, false);
}
/**
@ -1024,7 +1039,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
boolean create)
boolean create, boolean withWALDir)
throws Exception {
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length;
@ -1055,12 +1070,12 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
// Start the MiniHBaseCluster
return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
regionserverClass, create);
regionserverClass, create, withWALDir);
}
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
throws IOException, InterruptedException{
return startMiniHBaseCluster(numMasters, numSlaves, null, null, false);
return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false);
}
/**
@ -1079,11 +1094,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
final int numSlaves, Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
boolean create)
boolean create, boolean withWALDir)
throws IOException, InterruptedException {
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(create);
if (withWALDir) {
createWALRootDir();
}
// Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
// for tests that do not read hbase-defaults.xml
setHBaseFsTmpDir();
@ -1273,6 +1290,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return createRootDir(false);
}
/**
* Creates a hbase walDir in the user's home directory.
* Normally you won't make use of this method. Root hbaseWALDir
* is created for you as part of mini cluster startup. You'd only use this
* method if you were doing manual operation.
*
* @return Fully qualified path to hbase root dir
* @throws IOException
*/
public Path createWALRootDir() throws IOException {
FileSystem fs = FileSystem.get(this.conf);
Path walDir = getNewDataTestDirOnTestFS();
FSUtils.setWALRootDir(this.conf, walDir);
fs.mkdirs(walDir);
return walDir;
}
private void setHBaseFsTmpDir() throws IOException {
String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
@ -100,6 +101,7 @@ public class TestWALObserver {
private FileSystem fs;
private Path dir;
private Path hbaseRootDir;
private Path hbaseWALRootDir;
private String logName;
private Path oldLogDir;
private Path logDir;
@ -117,8 +119,11 @@ public class TestWALObserver {
TEST_UTIL.startMiniCluster(1);
Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
.makeQualified(new Path("/hbase"));
Path hbaseWALRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
.makeQualified(new Path("/hbaseLogRoot"));
LOG.info("hbase.rootdir=" + hbaseRootDir);
FSUtils.setRootDir(conf, hbaseRootDir);
FSUtils.setWALRootDir(conf, hbaseWALRootDir);
}
@AfterClass
@ -132,16 +137,20 @@ public class TestWALObserver {
// this.cluster = TEST_UTIL.getDFSCluster();
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = FSUtils.getRootDir(conf);
this.hbaseWALRootDir = FSUtils.getWALRootDir(conf);
this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
this.oldLogDir = new Path(this.hbaseRootDir,
this.oldLogDir = new Path(this.hbaseWALRootDir,
HConstants.HREGION_OLDLOGDIR_NAME);
this.logDir = new Path(this.hbaseRootDir,
this.logDir = new Path(this.hbaseWALRootDir,
AbstractFSWALProvider.getWALDirectoryName(currentTest.getMethodName()));
this.logName = HConstants.HREGION_LOGDIR_NAME;
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
}
this.wals = new WALFactory(conf, null, currentTest.getMethodName());
}
@ -155,6 +164,7 @@ public class TestWALObserver {
LOG.debug("details of failure to close wal factory.", exception);
}
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
}
/**

View File

@ -30,6 +30,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@ -43,8 +44,10 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
@ -69,15 +72,27 @@ import org.mockito.stubbing.Answer;
public class TestWALPlayer {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static MiniHBaseCluster cluster;
private static Path rootDir;
private static Path walRootDir;
private static FileSystem fs;
private static FileSystem logFs;
private static Configuration conf;
@BeforeClass
public static void beforeClass() throws Exception {
conf= TEST_UTIL.getConfiguration();
rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
fs = FSUtils.getRootDirFileSystem(conf);
logFs = FSUtils.getWALFileSystem(conf);
cluster = TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
fs.delete(rootDir, true);
logFs.delete(walRootDir, true);
}
/**
@ -109,7 +124,7 @@ public class TestWALPlayer {
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
Configuration configuration= TEST_UTIL.getConfiguration();
WALPlayer player = new WALPlayer(configuration);

View File

@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@ -68,6 +68,8 @@ public class TestWALRecordReader {
private static Configuration conf;
private static FileSystem fs;
private static Path hbaseDir;
private static FileSystem walFs;
private static Path walRootDir;
// visible for TestHLogRecordReader
static final TableName tableName = TableName.valueOf(getName());
private static final byte [] rowName = tableName.getName();
@ -88,12 +90,9 @@ public class TestWALRecordReader {
@Before
public void setUp() throws Exception {
fs.delete(hbaseDir, true);
walFs.delete(walRootDir, true);
mvcc = new MultiVersionConcurrencyControl();
FileStatus[] entries = fs.listStatus(hbaseDir);
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -107,8 +106,9 @@ public class TestWALRecordReader {
fs = TEST_UTIL.getDFSCluster().getFileSystem();
hbaseDir = TEST_UTIL.createRootDir();
logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
walRootDir = TEST_UTIL.createWALRootDir();
walFs = FSUtils.getWALFileSystem(conf);
logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
@ -116,6 +116,8 @@ public class TestWALRecordReader {
@AfterClass
public static void tearDownAfterClass() throws Exception {
fs.delete(hbaseDir, true);
walFs.delete(walRootDir, true);
TEST_UTIL.shutdownMiniCluster();
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the master filesystem in a local cluster
*/
@Category({MasterTests.class, MediumTests.class})
public class TestMasterFileSystemWithWALDir {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setupTest() throws Exception {
UTIL.startMiniCluster(true);
}
@AfterClass
public static void teardownTest() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void testFsUriSetProperly() throws Exception {
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MasterFileSystem fs = master.getMasterFileSystem();
Path masterRoot = FSUtils.getRootDir(fs.getConfiguration());
Path rootDir = FSUtils.getRootDir(fs.getFileSystem().getConf());
assertEquals(masterRoot, rootDir);
assertEquals(FSUtils.getWALRootDir(UTIL.getConfiguration()), fs.getWALRootDir());
}
}

View File

@ -109,7 +109,7 @@ public class TestMasterProcedureWalLease {
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
firstMaster.getMasterFileSystem().getFileSystem(),
((WALProcedureStore)masterStore).getLogDir(),
((WALProcedureStore)masterStore).getWALDir(),
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// Abort Latch for the test store
final CountDownLatch backupStore3Abort = new CountDownLatch(1);
@ -189,7 +189,7 @@ public class TestMasterProcedureWalLease {
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
firstMaster.getMasterFileSystem().getFileSystem(),
((WALProcedureStore)procStore).getLogDir(),
((WALProcedureStore)procStore).getWALDir(),
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// start a second store which should fence the first one out

View File

@ -84,7 +84,7 @@ public class TestWALProcedureStoreOnHDFS {
public void tearDown() throws Exception {
store.stop(false);
UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
try {
UTIL.shutdownMiniCluster();

View File

@ -354,7 +354,7 @@ public class TestHRegionServerBulkLoad {
int millisToRun = 30000;
int numScanners = 50;
UTIL.startMiniCluster(1);
UTIL.startMiniCluster(1, false, true);
try {
WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
FindBulkHBaseListener listener = new FindBulkHBaseListener();

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
import java.util.ArrayList;
@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
@ -89,7 +91,9 @@ public abstract class AbstractTestFSWAL {
FS.delete(dir.getPath(), true);
}
final Path hbaseDir = TEST_UTIL.createRootDir();
DIR = new Path(hbaseDir, currentTest.getMethodName());
final Path hbaseWALDir = TEST_UTIL.createWALRootDir();
DIR = new Path(hbaseWALDir, currentTest.getMethodName());
assertNotEquals(hbaseDir, hbaseWALDir);
}
@BeforeClass
@ -118,11 +122,11 @@ public abstract class AbstractTestFSWAL {
TEST_UTIL.shutdownMiniCluster();
}
protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir,
protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir,
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix) throws IOException;
protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir,
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException;
@ -132,16 +136,16 @@ public abstract class AbstractTestFSWAL {
@Test
public void testWALCoprocessorLoaded() throws Exception {
// test to see whether the coprocessor is loaded or not.
AbstractFSWAL<?> log = null;
AbstractFSWAL<?> wal = null;
try {
log = newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
CONF, null, true, null, null);
WALCoprocessorHost host = log.getCoprocessorHost();
WALCoprocessorHost host = wal.getCoprocessorHost();
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
assertNotNull(c);
} finally {
if (log != null) {
log.close();
if (wal != null) {
wal.close();
}
}
}
@ -182,7 +186,7 @@ public abstract class AbstractTestFSWAL {
AbstractFSWAL<?> wal1 = null;
AbstractFSWAL<?> walMeta = null;
try {
wal1 = newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
wal1 = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
CONF, null, true, null, null);
LOG.debug("Log obtained is: " + wal1);
Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
@ -193,7 +197,7 @@ public abstract class AbstractTestFSWAL {
// comparing with different filenum.
assertTrue(comp.compare(p1, p2) < 0);
walMeta =
newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
@ -240,7 +244,7 @@ public abstract class AbstractTestFSWAL {
LOG.debug("testFindMemStoresEligibleForFlush");
Configuration conf1 = HBaseConfiguration.create(CONF);
conf1.setInt("hbase.regionserver.maxlogs", 1);
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(conf1), DIR.toString(),
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(conf1), DIR.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
HTableDescriptor t1 =
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
@ -325,16 +329,16 @@ public abstract class AbstractTestFSWAL {
@Test(expected = IOException.class)
public void testFailedToCreateWALIfParentRenamed() throws IOException {
final String name = "testFailedToCreateWALIfParentRenamed";
AbstractFSWAL<?> log = newWAL(FS, FSUtils.getRootDir(CONF), name,
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), name,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
long filenum = System.currentTimeMillis();
Path path = log.computeFilename(filenum);
log.createWriterInstance(path);
Path path = wal.computeFilename(filenum);
wal.createWriterInstance(path);
Path parent = path.getParent();
path = log.computeFilename(filenum + 1);
path = wal.computeFilename(filenum + 1);
Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
FS.rename(parent, newPath);
log.createWriterInstance(path);
wal.createWriterInstance(path);
fail("It should fail to create the new WAL");
}
@ -364,7 +368,7 @@ public abstract class AbstractTestFSWAL {
scopes.put(fam, 0);
}
// subclass and doctor a method.
AbstractFSWAL<?> wal = newSlowWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), testName, CONF,
AbstractFSWAL<?> wal = newSlowWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF,
null, true, null, null, new Runnable() {
@Override
@ -425,7 +429,7 @@ public abstract class AbstractTestFSWAL {
@Test
public void testSyncNoAppend() throws IOException {
String testName = currentTest.getMethodName();
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), testName, CONF,
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF,
null, true, null, null);
try {
wal.sync();

View File

@ -58,19 +58,19 @@ import static org.junit.Assert.assertEquals;
public class TestFSHLog extends AbstractTestFSWAL {
@Override
protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new FSHLog(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix);
}
@Override
protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir,
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new FSHLog(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix) {
@Override

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -71,7 +72,8 @@ public class TestLogRollAbort {
/* For the split-then-roll test */
private static final Path HBASEDIR = new Path("/hbase");
private static final Path OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
private static final Path HBASELOGDIR = new Path("/hbaselog");
private static final Path OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
// Need to override this setup so we can edit the config before it gets sent
// to the HDFS & HBase cluster startup.
@ -111,6 +113,7 @@ public class TestLogRollAbort {
// disable region rebalancing (interferes with log watching)
cluster.getMaster().balanceSwitch(false);
FSUtils.setRootDir(conf, HBASEDIR);
FSUtils.setWALRootDir(conf, HBASELOGDIR);
}
@After
@ -176,7 +179,7 @@ public class TestLogRollAbort {
public void testLogRollAfterSplitStart() throws IOException {
LOG.info("Verify wal roll after split starts will fail.");
String logName = "testLogRollAfterSplitStart";
Path thisTestsDir = new Path(HBASEDIR, AbstractFSWALProvider.getWALDirectoryName(logName));
Path thisTestsDir = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(logName));
final WALFactory wals = new WALFactory(conf, null, logName);
try {
@ -218,7 +221,7 @@ public class TestLogRollAbort {
LOG.debug("Renamed region directory: " + rsSplitDir);
LOG.debug("Processing the old log files.");
WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals);
WALSplitter.split(HBASELOGDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals);
LOG.debug("Trying to roll the WAL.");
try {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -59,21 +60,27 @@ public class TestWALActionsListener {
new HBaseTestingUtility();
private final static byte[] SOME_BYTES = Bytes.toBytes("t");
private static FileSystem fs;
private static Configuration conf;
private static Path rootDir;
private static Path walRootDir;
private static FileSystem fs;
private static FileSystem logFs;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
conf.setInt("hbase.regionserver.maxlogs", 5);
fs = FileSystem.get(conf);
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
fs = FSUtils.getRootDirFileSystem(conf);
logFs = FSUtils.getWALFileSystem(conf);
}
@Before
public void setUp() throws Exception {
fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME), true);
fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME), true);
fs.delete(rootDir, true);
logFs.delete(new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME), true);
logFs.delete(new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME), true);
}
@After

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -384,6 +385,54 @@ public class TestFSUtils {
verifyFileInDirWithStoragePolicy("1772");
}
@Test
public void testSetWALRootDir() throws Exception {
HBaseTestingUtility htu = new HBaseTestingUtility();
Configuration conf = htu.getConfiguration();
Path p = new Path("file:///hbase/root");
FSUtils.setWALRootDir(conf, p);
assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR));
}
@Test
public void testGetWALRootDir() throws IOException {
HBaseTestingUtility htu = new HBaseTestingUtility();
Configuration conf = htu.getConfiguration();
Path root = new Path("file:///hbase/root");
Path walRoot = new Path("file:///hbase/logroot");
FSUtils.setRootDir(conf, root);
assertEquals(FSUtils.getRootDir(conf), root);
assertEquals(FSUtils.getWALRootDir(conf), root);
FSUtils.setWALRootDir(conf, walRoot);
assertEquals(FSUtils.getWALRootDir(conf), walRoot);
}
@Test(expected=IllegalStateException.class)
public void testGetWALRootDirIllegalWALDir() throws IOException {
HBaseTestingUtility htu = new HBaseTestingUtility();
Configuration conf = htu.getConfiguration();
Path root = new Path("file:///hbase/root");
Path invalidWALDir = new Path("file:///hbase/root/logroot");
FSUtils.setRootDir(conf, root);
FSUtils.setWALRootDir(conf, invalidWALDir);
FSUtils.getWALRootDir(conf);
}
@Test
public void testRemoveWALRootPath() throws Exception {
HBaseTestingUtility htu = new HBaseTestingUtility();
Configuration conf = htu.getConfiguration();
FSUtils.setRootDir(conf, new Path("file:///user/hbase"));
Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile");
Path tmpFile = new Path("file:///test/testfile");
assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile");
assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString());
Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog");
assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog");
}
/**
* Ugly test that ensures we can get at the hedged read counters in dfsclient.
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.

View File

@ -100,7 +100,7 @@ public class IOTestProvider implements WALProvider {
providerId = DEFAULT_PROVIDER_ID;
}
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf),
log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);

View File

@ -83,6 +83,7 @@ public class TestWALFactory {
private static MiniDFSCluster cluster;
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static Path hbaseDir;
protected static Path hbaseWALDir;
protected FileSystem fs;
protected Path dir;
@ -141,6 +142,7 @@ public class TestWALFactory {
cluster = TEST_UTIL.getDFSCluster();
hbaseDir = TEST_UTIL.createRootDir();
hbaseWALDir = TEST_UTIL.createWALRootDir();
}
@AfterClass
@ -163,12 +165,12 @@ public class TestWALFactory {
final TableName tableName = TableName.valueOf(currentTest.getMethodName());
final byte [] rowName = tableName.getName();
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
final Path logdir = new Path(hbaseDir,
final Path logdir = new Path(hbaseWALDir,
AbstractFSWALProvider.getWALDirectoryName(currentTest.getMethodName()));
Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
final int howmany = 3;
HRegionInfo[] infos = new HRegionInfo[3];
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
fs.mkdirs(tabledir);
for(int i = 0; i < howmany; i++) {
infos[i] = new HRegionInfo(tableName,
@ -207,7 +209,7 @@ public class TestWALFactory {
}
}
wals.shutdown();
List<Path> splits = WALSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf, wals);
List<Path> splits = WALSplitter.split(hbaseWALDir, logdir, oldLogDir, fs, conf, wals);
verifySplits(splits, howmany);
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@Category(MediumTests.class)
public class TestWALRootDir {
private static final Log LOG = LogFactory.getLog(TestWALRootDir.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf;
private static FileSystem fs;
private static FileSystem walFs;
static final TableName tableName = TableName.valueOf("TestWALWALDir");
private static final byte [] rowName = Bytes.toBytes("row");
private static final byte [] family = Bytes.toBytes("column");
private static HTableDescriptor htd;
private static Path walRootDir;
private static Path rootDir;
private static WALFactory wals;
@Before
public void setUp() throws Exception {
cleanup();
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(1);
rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
fs = FSUtils.getRootDirFileSystem(conf);
walFs = FSUtils.getWALFileSystem(conf);
htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
cleanup();
TEST_UTIL.shutdownMiniDFSCluster();
}
@Test
public void testWALRootDir() throws Exception {
HRegionInfo regionInfo = new HRegionInfo(tableName);
wals = new WALFactory(conf, null, "testWALRootDir");
WAL log = wals.getWAL(regionInfo.getEncodedNameAsBytes(), regionInfo.getTable().getNamespace());
assertEquals(1, getWALFiles(walFs, walRootDir).size());
byte [] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true);
log.sync(txid);
assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size());
log.rollWriter();
//Create 1 more WAL
assertEquals(2, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit, true);
log.sync(txid);
log.rollWriter();
log.shutdown();
assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
}
protected WALKey getWalKey(final long time, HRegionInfo hri, final long startPoint) {
return new WALKey(hri.getEncodedNameAsBytes(), tableName, time, new MultiVersionConcurrencyControl(startPoint));
}
private List<FileStatus> getWALFiles(FileSystem fs, Path dir)
throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
LOG.debug("Scanning " + dir.toString() + " for WAL files");
FileStatus[] files = fs.listStatus(dir);
if (files == null) return Collections.emptyList();
for (FileStatus file : files) {
if (file.isDirectory()) {
// recurse into sub directories
result.addAll(getWALFiles(fs, file.getPath()));
} else {
String name = file.getPath().toString();
if (!name.startsWith(".")) {
result.add(file);
}
}
}
return result;
}
private static void cleanup() throws Exception{
walFs.delete(walRootDir, true);
fs.delete(rootDir, true);
}
}

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
@ -119,6 +120,7 @@ public class TestWALSplit {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Path HBASEDIR;
private Path HBASELOGDIR;
private Path WALDIR;
private Path OLDLOGDIR;
private Path CORRUPTDIR;
@ -181,8 +183,9 @@ public class TestWALSplit {
LOG.info("Cleaning up cluster for new test.");
fs = TEST_UTIL.getDFSCluster().getFileSystem();
HBASEDIR = TEST_UTIL.createRootDir();
OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
HBASELOGDIR = TEST_UTIL.createWALRootDir();
OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
REGIONS.clear();
Collections.addAll(REGIONS, "bbb", "ccc");
@ -190,7 +193,7 @@ public class TestWALSplit {
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
wals = new WALFactory(conf, null, name.getMethodName());
WALDIR = new Path(HBASEDIR, AbstractFSWALProvider.getWALDirectoryName(name.getMethodName()));
WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(name.getMethodName()));
//fs.mkdirs(WALDIR);
}
@ -206,6 +209,7 @@ public class TestWALSplit {
} finally {
wals = null;
fs.delete(HBASEDIR, true);
fs.delete(HBASELOGDIR, true);
}
}
@ -1114,7 +1118,7 @@ public class TestWALSplit {
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
final Path corruptDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
assertEquals(1, fs.listStatus(corruptDir).length);
}