diff --git a/CHANGES.txt b/CHANGES.txt
index 291671e0d8f..522951b4cc1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -81,6 +81,7 @@ Trunk (unreleased changes)
HADOOP-1884 Remove useless debugging log messages from hbase.mapred
HADOOP-1856 Add Jar command to hbase shell using Hadoop RunJar util
(Edward Yoon via Stack)
+ HADOOP-1928 ] Have master pass the regionserver the filesystem to use
Below are the list of changes before 2007-08-18
diff --git a/build.xml b/build.xml
index a508f6dd9ff..c7890fd7184 100644
--- a/build.xml
+++ b/build.xml
@@ -94,6 +94,7 @@ to call at top-level: ant deploy-contrib compile-core-test
+
diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java
index ec94df1aedd..dfbae9f5ffb 100644
--- a/src/java/org/apache/hadoop/hbase/HMaster.java
+++ b/src/java/org/apache/hadoop/hbase/HMaster.java
@@ -937,12 +937,11 @@ HMasterRegionInterface {
/**
* Checks to see if the file system is still accessible.
* If not, sets closed
- *
* @return false if file system is not available
*/
protected boolean checkFileSystem() {
if (fsOk) {
- if (!FSUtils.isFileSystemAvailable(fs)) {
+ if (!FSUtils.isFileSystemAvailable(fs, closed)) {
LOG.fatal("Shutting down HBase cluster: file system not available");
closed.set(true);
fsOk = false;
@@ -1127,9 +1126,9 @@ HMasterRegionInterface {
* HMasterRegionInterface
*/
- /** {@inheritDoc} */
@SuppressWarnings("unused")
- public void regionServerStartup(HServerInfo serverInfo) throws IOException {
+ public MapWritable regionServerStartup(HServerInfo serverInfo)
+ throws IOException {
String s = serverInfo.getServerAddress().toString().trim();
HServerInfo storedInfo = null;
LOG.info("received start message from: " + s);
@@ -1137,11 +1136,9 @@ HMasterRegionInterface {
// If we get the startup message but there's an old server by that
// name, then we can timeout the old one right away and register
// the new one.
-
synchronized (serversToServerInfo) {
storedInfo = serversToServerInfo.remove(s);
HServerLoad load = serversToLoad.remove(s);
-
if (load != null) {
Set servers = loadToServers.get(load);
if (servers != null) {
@@ -1160,7 +1157,6 @@ HMasterRegionInterface {
}
// Either way, record the new server
-
synchronized (serversToServerInfo) {
HServerLoad load = new HServerLoad();
serverInfo.setLoad(load);
@@ -1178,6 +1174,22 @@ HMasterRegionInterface {
long serverLabel = getServerLabel(s);
serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
}
+
+ return createConfigurationSubset();
+ }
+
+ /**
+ * @return Subset of configuration to pass initializing regionservers: e.g.
+ * the filesystem to use and root directory to use.
+ */
+ protected MapWritable createConfigurationSubset() {
+ MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
+ return addConfig(mw, "fs.default.name");
+ }
+
+ private MapWritable addConfig(final MapWritable mw, final String key) {
+ mw.put(new Text(key), new Text(this.conf.get(key)));
+ return mw;
}
private long getServerLabel(final String s) {
diff --git a/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java b/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
index 1ba392d8a2a..126e8bed356 100644
--- a/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
+++ b/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
@@ -34,8 +36,10 @@ public interface HMasterRegionInterface extends VersionedProtocol {
* Called when a region server first starts
* @param info
* @throws IOException
+ * @return Configuration for the regionserver to use: e.g. filesystem,
+ * hbase rootdir, etc.
*/
- public void regionServerStartup(HServerInfo info) throws IOException;
+ public MapWritable regionServerStartup(HServerInfo info) throws IOException;
/**
* Called to renew lease, tell master what the region server is doing and to
diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java
index 98010344296..bcf6d75dc59 100644
--- a/src/java/org/apache/hadoop/hbase/HRegionServer.java
+++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
@@ -67,22 +69,12 @@ import org.apache.hadoop.util.StringUtils;
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
static final Log LOG = LogFactory.getLog(HRegionServer.class);
- /** {@inheritDoc} */
- public long getProtocolVersion(final String protocol,
- @SuppressWarnings("unused") final long clientVersion)
- throws IOException {
- if (protocol.equals(HRegionInterface.class.getName())) {
- return HRegionInterface.versionID;
- }
- throw new IOException("Unknown protocol to name node: " + protocol);
- }
-
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation. We use AtomicBoolean rather than
// plain boolean so we can pass a reference to Chore threads. Otherwise,
// Chore threads need to know about the hosting class.
- protected AtomicBoolean stopRequested = new AtomicBoolean(false);
+ protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
@@ -91,38 +83,35 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// If false, the file system has become unavailable
protected volatile boolean fsOk;
- final Path rootDir;
protected final HServerInfo serverInfo;
protected final Configuration conf;
- private final Random rand;
+ private final Random rand = new Random();
// region name -> HRegion
- protected final SortedMap onlineRegions;
+ protected final SortedMap onlineRegions =
+ Collections.synchronizedSortedMap(new TreeMap());
protected final Map retiringRegions =
new HashMap();
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final Vector outboundMsgs;
+ private final Vector outboundMsgs = new Vector();
int numRetries;
protected final int threadWakeFrequency;
private final int msgInterval;
-
- // File paths
- private FileSystem fs;
// Remote HMaster
- private HMasterRegionInterface hbaseMaster;
+ private final HMasterRegionInterface hbaseMaster;
// Server to handle client requests. Default access so can be accessed by
// unit tests.
- Server server;
+ final Server server;
// Leases
- private Leases leases;
+ private final Leases leases;
// Request counter
- private AtomicInteger requestCount;
+ private final AtomicInteger requestCount = new AtomicInteger();
// A sleeper that sleeps for msgInterval.
private final Sleeper sleeper;
@@ -134,7 +123,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// interrupted.
protected final Integer splitOrCompactLock = new Integer(0);
- /**
+ /*
* Runs periodically to determine if regions need to be compacted or split
*/
class SplitOrCompactChecker extends Chore
@@ -150,7 +139,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
30 * 1000), stop);
}
- /** {@inheritDoc} */
public void closing(final Text regionName) {
lock.writeLock().lock();
try {
@@ -166,7 +154,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
- /** {@inheritDoc} */
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
@@ -290,7 +277,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
super(period, stop);
}
- /** {@inheritDoc} */
@Override
protected void chore() {
synchronized(cacheFlusherLock) {
@@ -326,8 +312,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
- // HLog and HLog roller.
- protected final HLog log;
+ // HLog and HLog roller. log is protected rather than private to avoid
+ // eclipse warning when accessed by inner classes
+ protected HLog log;
private final Thread logRollerThread;
protected final Integer logRollerLock = new Integer(0);
@@ -375,32 +362,21 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @throws IOException
*/
public HRegionServer(Configuration conf) throws IOException {
- this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)),
- new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
- DEFAULT_REGIONSERVER_ADDRESS)),
- conf);
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
}
/**
* Starts a HRegionServer at the specified location
- * @param rootDir
* @param address
* @param conf
* @throws IOException
*/
- public HRegionServer(Path rootDir, HServerAddress address,
- Configuration conf)
+ public HRegionServer(HServerAddress address, Configuration conf)
throws IOException {
this.abortRequested = false;
this.fsOk = true;
- this.rootDir = rootDir;
this.conf = conf;
- this.rand = new Random();
- this.onlineRegions =
- Collections.synchronizedSortedMap(new TreeMap());
-
- this.outboundMsgs = new Vector();
- this.requestCount = new AtomicInteger();
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
@@ -416,112 +392,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
new SplitOrCompactChecker(this.stopRequested);
// Task thread to process requests from Master
- this.toDo = new LinkedBlockingQueue();
this.worker = new Worker();
this.workerThread = new Thread(worker);
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
-
- try {
- // Server to handle client requests
- this.server = RPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
-
- // Use interface to get the 'real' IP for this host.
- // 'serverInfo' is sent to master. Should have the real IP of this host
- // rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it.
- String realIP = DNS.getDefaultIP(
- conf.get("dfs.datanode.dns.interface","default"));
- this.serverInfo = new HServerInfo(new HServerAddress(
- new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
- this.rand.nextLong());
- Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" +
- this.serverInfo.getServerAddress().getPort());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Log dir " + logdir);
- }
-
- // Logging
- this.fs = FileSystem.get(conf);
- if(fs.exists(logdir)) {
- throw new RegionServerRunningException("region server already " +
- "running at " + this.serverInfo.getServerAddress().toString() +
- " because logdir " + logdir.toString() + " exists");
- }
-
- this.log = new HLog(fs, logdir, conf);
- this.logRollerThread =
- new LogRoller(this.threadWakeFrequency, stopRequested);
-
- // Remote HMaster
- this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
- HMasterRegionInterface.class, HMasterRegionInterface.versionID,
- new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
- conf);
- } catch (IOException e) {
- this.stopRequested.set(true);
- throw RemoteExceptionHandler.checkIOException(e);
- }
- }
-
- /** @return the HLog */
- HLog getLog() {
- return log;
- }
-
- /**
- * Sets a flag that will cause all the HRegionServer threads to shut down
- * in an orderly fashion. Used by unit tests and called by {@link Flusher}
- * if it judges server needs to be restarted.
- */
- synchronized void stop() {
- this.stopRequested.set(true);
- notifyAll(); // Wakes run() if it is sleeping
- }
-
- /**
- * Cause the server to exit without closing the regions it is serving, the
- * log it is using and without notifying the master.
- * Used unit testing and on catastrophic events such as HDFS is yanked out
- * from under hbase or we OOME.
- */
- synchronized void abort() {
- this.abortRequested = true;
- stop();
- }
-
- /**
- * Wait on all threads to finish.
- * Presumption is that all closes and stops have already been called.
- */
- void join() {
- try {
- this.workerThread.join();
- } catch(InterruptedException iex) {
- // continue
- }
- try {
- this.logRollerThread.join();
- } catch(InterruptedException iex) {
- // continue
- }
- try {
- this.cacheFlusherThread.join();
- } catch(InterruptedException iex) {
- // continue
- }
- try {
- this.splitOrCompactCheckerThread.join();
- } catch(InterruptedException iex) {
- // continue
- }
- try {
- this.server.join();
- } catch(InterruptedException iex) {
- // continue
- }
- LOG.info("HRegionServer stopped at: " +
- serverInfo.getServerAddress().toString());
+ this.logRollerThread =
+ new LogRoller(this.threadWakeFrequency, stopRequested);
+ // Server to handle client requests
+ this.server = RPC.getServer(this, address.getBindAddress(),
+ address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+ false, conf);
+ this.serverInfo = new HServerInfo(new HServerAddress(
+ new InetSocketAddress(getThisIP(),
+ this.server.getListenerAddress().getPort())), this.rand.nextLong());
+ this.leases = new Leases(
+ conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
+ this.threadWakeFrequency);
+ // Remote HMaster
+ this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
+ HMasterRegionInterface.class, HMasterRegionInterface.versionID,
+ new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
+ conf);
}
/**
@@ -530,21 +420,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* load/unload instructions.
*/
public void run() {
- startAllServices();
-
// Set below if HMaster asked us stop.
boolean masterRequestedStop = false;
try {
+ init(reportForDuty());
while(!stopRequested.get()) {
long lastMsg = 0;
- try {
- reportForDuty();
- } catch(IOException e) {
- this.sleeper.sleep(lastMsg);
- continue;
- }
-
// Now ask master what it wants us to do and tell it what we have done
for (int tries = 0; !stopRequested.get();) {
if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
@@ -630,7 +512,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
LOG.fatal("Unhandled exception. Aborting...", t);
abort();
}
- leases.closeAfterLeasesExpire();
+ this.leases.closeAfterLeasesExpire();
this.worker.stop();
this.server.stop();
@@ -691,10 +573,52 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
serverInfo.getServerAddress().toString());
}
- join();
+ join();
LOG.info(Thread.currentThread().getName() + " exiting");
}
-
+
+ /*
+ * Run init. Sets up hlog and starts up all server threads.
+ * @param c Extra configuration.
+ */
+ private void init(final MapWritable c) {
+ try {
+ for (Map.Entry e: c.entrySet()) {
+ String key = e.getKey().toString();
+ String value = e.getValue().toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Config from master: " + key + "=" + value);
+ }
+ this.conf.set(key, value);
+ }
+ this.log = setupHLog();
+ startServiceThreads();
+ } catch (IOException e) {
+ this.stopRequested.set(true);
+ LOG.fatal("Failed init",
+ RemoteExceptionHandler.checkIOException(e));
+ }
+ }
+
+ private HLog setupHLog()
+ throws RegionServerRunningException, IOException {
+ String rootDir = this.conf.get(HConstants.HBASE_DIR);
+ LOG.info("Root dir: " + rootDir);
+ Path logdir = new Path(new Path(rootDir),
+ "log" + "_" + getThisIP() + "_" +
+ this.serverInfo.getServerAddress().getPort());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Log dir " + logdir);
+ }
+ FileSystem fs = FileSystem.get(this.conf);
+ if (fs.exists(logdir)) {
+ throw new RegionServerRunningException("region server already " +
+ "running at " + this.serverInfo.getServerAddress().toString() +
+ " because logdir " + logdir.toString() + " exists");
+ }
+ return new HLog(fs, logdir, conf);
+ }
+
/*
* Start Chore Threads, Server, Worker and lease checker threads. Install an
* UncaughtExceptionHandler that calls abort of RegionServer if we get
@@ -707,7 +631,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* Chore, it keeps its own internal stop mechanism so needs to be stopped
* by this hosting server. Worker logs the exception and exits.
*/
- private void startAllServices() {
+ private void startServiceThreads() throws IOException {
String n = Thread.currentThread().getName();
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
@@ -728,41 +652,105 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
- this.leases = new Leases(
- conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
- this.threadWakeFrequency);
this.leases.setName(n + ".leaseChecker");
this.leases.start();
// Start Server. This service is like leases in that it internally runs
// a thread.
- try {
- this.server.start();
- LOG.info("HRegionServer started at: " +
+ this.server.start();
+ LOG.info("HRegionServer started at: " +
serverInfo.getServerAddress().toString());
- } catch(IOException e) {
- this.stopRequested.set(true);
- LOG.fatal("Failed start Server",
- RemoteExceptionHandler.checkIOException(e));
+ }
+
+ /** @return the HLog */
+ HLog getLog() {
+ return this.log;
+ }
+
+ /*
+ * Use interface to get the 'real' IP for this host. 'serverInfo' is sent to
+ * master. Should have the real IP of this host rather than 'localhost' or
+ * 0.0.0.0 or 127.0.0.1 in it.
+ * @return This servers' IP.
+ */
+ private String getThisIP() throws UnknownHostException {
+ return DNS.getDefaultIP(conf.get("dfs.datanode.dns.interface","default"));
+ }
+
+ /**
+ * Sets a flag that will cause all the HRegionServer threads to shut down
+ * in an orderly fashion. Used by unit tests and called by {@link Flusher}
+ * if it judges server needs to be restarted.
+ */
+ synchronized void stop() {
+ this.stopRequested.set(true);
+ notifyAll(); // Wakes run() if it is sleeping
+ }
+
+ /**
+ * Cause the server to exit without closing the regions it is serving, the
+ * log it is using and without notifying the master.
+ * Used unit testing and on catastrophic events such as HDFS is yanked out
+ * from under hbase or we OOME.
+ */
+ synchronized void abort() {
+ this.abortRequested = true;
+ stop();
+ }
+
+ /**
+ * Wait on all threads to finish.
+ * Presumption is that all closes and stops have already been called.
+ */
+ void join() {
+ join(this.workerThread);
+ join(this.logRollerThread);
+ join(this.cacheFlusherThread);
+ join(this.splitOrCompactCheckerThread);
+ try {
+ this.server.join();
+ } catch (InterruptedException e) {
+ // No means of asking server if its done... .so just assume it is even
+ // if an interrupt.
+ }
+ }
+
+ private void join(final Thread t) {
+ while (t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ // continue
+ }
}
}
/*
* Let the master know we're here
- * @throws IOException
+ * Run initialization using parameters passed us by the master.
*/
- private void reportForDuty() throws IOException {
+ private MapWritable reportForDuty() {
if (LOG.isDebugEnabled()) {
LOG.debug("Telling master we are up");
}
- this.requestCount.set(0);
- this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
- this.hbaseMaster.regionServerStartup(serverInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done telling master we are up");
+ MapWritable result = null;
+ while(!stopRequested.get()) {
+ long lastMsg = 0;
+ try {
+ this.requestCount.set(0);
+ this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
+ result = this.hbaseMaster.regionServerStartup(serverInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done telling master we are up");
+ }
+ break;
+ } catch(IOException e) {
+ this.sleeper.sleep(lastMsg);
+ continue;
+ }
}
+ return result;
}
-
/** Add to the outbound message buffer */
private void reportOpen(HRegion region) {
synchronized(outboundMsgs) {
@@ -808,7 +796,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.msg = msg;
}
}
- BlockingQueue toDo;
+ BlockingQueue toDo = new LinkedBlockingQueue();
private Worker worker;
private Thread workerThread;
@@ -886,7 +874,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
void openRegion(HRegionInfo regionInfo) throws IOException {
HRegion region = onlineRegions.get(regionInfo.regionName);
if(region == null) {
- region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
+ region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
+ this.log, FileSystem.get(conf), conf, regionInfo, null);
this.lock.writeLock().lock();
try {
this.log.setSequenceNumber(region.getMaxSequenceId());
@@ -1275,7 +1264,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/
protected boolean checkFileSystem() {
if (this.fsOk) {
- if (!FSUtils.isFileSystemAvailable(fs)) {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(this.conf);
+ } catch (IOException e) {
+ LOG.error("Failed get of filesystem", e);
+ }
+ if (fs != null && !FSUtils.isFileSystemAvailable(fs, stopRequested)) {
LOG.fatal("Shutting down HRegionServer: file system not available");
this.abortRequested = true;
this.stopRequested.set(true);
@@ -1308,6 +1303,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return regionsToCheck;
}
+ public long getProtocolVersion(final String protocol,
+ @SuppressWarnings("unused") final long clientVersion)
+ throws IOException {
+ if (protocol.equals(HRegionInterface.class.getName())) {
+ return HRegionInterface.versionID;
+ }
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+
//
// Main program and support routines
//
diff --git a/src/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/java/org/apache/hadoop/hbase/util/FSUtils.java
index 39814059447..9b82514ce2b 100644
--- a/src/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/src/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -20,11 +20,13 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.dfs.DistributedFileSystem;
/**
@@ -36,23 +38,28 @@ public class FSUtils {
/**
* Not instantiable
*/
- private FSUtils() {}
+ private FSUtils() {super();}
/**
* Checks to see if the specified file system is available
*
* @param fs
+ * @param closed Optional flag. If non-null and set, will abort test of
+ * filesytem. Presumption is a flag shared by multiple threads. Another
+ * may have already determined the filesystem -- or something else -- bad.
* @return true if the specified file system is available.
*/
- public static boolean isFileSystemAvailable(FileSystem fs) {
+ public static boolean isFileSystemAvailable(final FileSystem fs,
+ final AtomicBoolean closed) {
if (!(fs instanceof DistributedFileSystem)) {
return true;
}
boolean available = false;
DistributedFileSystem dfs = (DistributedFileSystem) fs;
int maxTries = dfs.getConf().getInt("hbase.client.retries.number", 3);
- Path root = new Path(dfs.getConf().get("hbase.dir", "/"));
- for (int i = 0; i < maxTries; i++) {
+ Path root =
+ fs.makeQualified(new Path(dfs.getConf().get(HConstants.HBASE_DIR, "/")));
+ for (int i = 0; i < maxTries && (closed == null || !closed.get()); i++) {
IOException ex = null;
try {
if (dfs.exists(root)) {
@@ -62,12 +69,10 @@ public class FSUtils {
} catch (IOException e) {
ex = e;
}
- String exception = "";
- if (ex != null) {
- exception = ": " + ex.getMessage();
- }
- LOG.info("Failed exists test on " + root + " (Attempt " + i + ")" +
- exception);
+ String exception = (ex == null)? "": ": " + ex.getMessage();
+ LOG.info("Failed exists test on " + root + " by thread " +
+ Thread.currentThread().getName() + " (Attempt " + i + " of " +
+ maxTries +"): " + exception);
}
try {
if (!available) {
diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
index 589200a3bbd..a1962d79c79 100644
--- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
+++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
@@ -63,10 +63,10 @@ public class MultiRegionTable extends HBaseTestCase {
HConstants.DEFAULT_MAX_FILE_SIZE) <= 1024 * 1024);
final int retries = 10;
- Path d = cluster.regionThreads.get(0).getRegionServer().rootDir;
FileSystem fs = (cluster.getDFSCluster() == null) ?
localFs : cluster.getDFSCluster().getFileSystem();
assertNotNull(fs);
+ Path d = fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR)));
// Get connection on the meta table and get count of rows.
diff --git a/src/test/org/apache/hadoop/hbase/OOMERegionServer.java b/src/test/org/apache/hadoop/hbase/OOMERegionServer.java
index 4700c8cfec2..a8832579f42 100644
--- a/src/test/org/apache/hadoop/hbase/OOMERegionServer.java
+++ b/src/test/org/apache/hadoop/hbase/OOMERegionServer.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.io.Text;
@@ -42,9 +41,9 @@ public class OOMERegionServer extends HRegionServer {
super(conf);
}
- public OOMERegionServer(Path rootDir, HServerAddress address,
- Configuration conf) throws IOException {
- super(rootDir, address, conf);
+ public OOMERegionServer(HServerAddress address, Configuration conf)
+ throws IOException {
+ super(address, conf);
}
@Override
diff --git a/src/test/org/apache/hadoop/hbase/TestDFSAbort.java b/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
index 47bb56624e1..7f73ecaf739 100644
--- a/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
+++ b/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
@@ -33,8 +33,6 @@ public class TestDFSAbort extends HBaseClusterTestCase {
/** constructor */
public TestDFSAbort() {
super();
-// conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
-// conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
}
@@ -66,4 +64,4 @@ public class TestDFSAbort extends HBaseClusterTestCase {
public static void main(@SuppressWarnings("unused") String[] args) {
TestRunner.run(new TestSuite(TestDFSAbort.class));
}
-}
+}
\ No newline at end of file