HADOOP-1928 Have master pass the regionserver the filesystem to use
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@580166 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ccf42acf70
commit
7acd7d074a
|
@ -81,6 +81,7 @@ Trunk (unreleased changes)
|
||||||
HADOOP-1884 Remove useless debugging log messages from hbase.mapred
|
HADOOP-1884 Remove useless debugging log messages from hbase.mapred
|
||||||
HADOOP-1856 Add Jar command to hbase shell using Hadoop RunJar util
|
HADOOP-1856 Add Jar command to hbase shell using Hadoop RunJar util
|
||||||
(Edward Yoon via Stack)
|
(Edward Yoon via Stack)
|
||||||
|
HADOOP-1928 ] Have master pass the regionserver the filesystem to use
|
||||||
|
|
||||||
|
|
||||||
Below are the list of changes before 2007-08-18
|
Below are the list of changes before 2007-08-18
|
||||||
|
|
|
@ -94,6 +94,7 @@ to call at top-level: ant deploy-contrib compile-core-test
|
||||||
<pathelement location="${conf.dir}"/>
|
<pathelement location="${conf.dir}"/>
|
||||||
<pathelement location="${hadoop.root}/build"/>
|
<pathelement location="${hadoop.root}/build"/>
|
||||||
<pathelement location="${src.test}"/>
|
<pathelement location="${src.test}"/>
|
||||||
|
<pathelement location="${root}/conf"/>
|
||||||
<path refid="classpath"/>
|
<path refid="classpath"/>
|
||||||
</path>
|
</path>
|
||||||
|
|
||||||
|
|
|
@ -937,12 +937,11 @@ HMasterRegionInterface {
|
||||||
/**
|
/**
|
||||||
* Checks to see if the file system is still accessible.
|
* Checks to see if the file system is still accessible.
|
||||||
* If not, sets closed
|
* If not, sets closed
|
||||||
*
|
|
||||||
* @return false if file system is not available
|
* @return false if file system is not available
|
||||||
*/
|
*/
|
||||||
protected boolean checkFileSystem() {
|
protected boolean checkFileSystem() {
|
||||||
if (fsOk) {
|
if (fsOk) {
|
||||||
if (!FSUtils.isFileSystemAvailable(fs)) {
|
if (!FSUtils.isFileSystemAvailable(fs, closed)) {
|
||||||
LOG.fatal("Shutting down HBase cluster: file system not available");
|
LOG.fatal("Shutting down HBase cluster: file system not available");
|
||||||
closed.set(true);
|
closed.set(true);
|
||||||
fsOk = false;
|
fsOk = false;
|
||||||
|
@ -1127,9 +1126,9 @@ HMasterRegionInterface {
|
||||||
* HMasterRegionInterface
|
* HMasterRegionInterface
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public void regionServerStartup(HServerInfo serverInfo) throws IOException {
|
public MapWritable regionServerStartup(HServerInfo serverInfo)
|
||||||
|
throws IOException {
|
||||||
String s = serverInfo.getServerAddress().toString().trim();
|
String s = serverInfo.getServerAddress().toString().trim();
|
||||||
HServerInfo storedInfo = null;
|
HServerInfo storedInfo = null;
|
||||||
LOG.info("received start message from: " + s);
|
LOG.info("received start message from: " + s);
|
||||||
|
@ -1137,11 +1136,9 @@ HMasterRegionInterface {
|
||||||
// If we get the startup message but there's an old server by that
|
// If we get the startup message but there's an old server by that
|
||||||
// name, then we can timeout the old one right away and register
|
// name, then we can timeout the old one right away and register
|
||||||
// the new one.
|
// the new one.
|
||||||
|
|
||||||
synchronized (serversToServerInfo) {
|
synchronized (serversToServerInfo) {
|
||||||
storedInfo = serversToServerInfo.remove(s);
|
storedInfo = serversToServerInfo.remove(s);
|
||||||
HServerLoad load = serversToLoad.remove(s);
|
HServerLoad load = serversToLoad.remove(s);
|
||||||
|
|
||||||
if (load != null) {
|
if (load != null) {
|
||||||
Set<String> servers = loadToServers.get(load);
|
Set<String> servers = loadToServers.get(load);
|
||||||
if (servers != null) {
|
if (servers != null) {
|
||||||
|
@ -1160,7 +1157,6 @@ HMasterRegionInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Either way, record the new server
|
// Either way, record the new server
|
||||||
|
|
||||||
synchronized (serversToServerInfo) {
|
synchronized (serversToServerInfo) {
|
||||||
HServerLoad load = new HServerLoad();
|
HServerLoad load = new HServerLoad();
|
||||||
serverInfo.setLoad(load);
|
serverInfo.setLoad(load);
|
||||||
|
@ -1178,6 +1174,22 @@ HMasterRegionInterface {
|
||||||
long serverLabel = getServerLabel(s);
|
long serverLabel = getServerLabel(s);
|
||||||
serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
|
serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return createConfigurationSubset();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Subset of configuration to pass initializing regionservers: e.g.
|
||||||
|
* the filesystem to use and root directory to use.
|
||||||
|
*/
|
||||||
|
protected MapWritable createConfigurationSubset() {
|
||||||
|
MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
|
||||||
|
return addConfig(mw, "fs.default.name");
|
||||||
|
}
|
||||||
|
|
||||||
|
private MapWritable addConfig(final MapWritable mw, final String key) {
|
||||||
|
mw.put(new Text(key), new Text(this.conf.get(key)));
|
||||||
|
return mw;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getServerLabel(final String s) {
|
private long getServerLabel(final String s) {
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.MapWritable;
|
||||||
import org.apache.hadoop.ipc.VersionedProtocol;
|
import org.apache.hadoop.ipc.VersionedProtocol;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -34,8 +36,10 @@ public interface HMasterRegionInterface extends VersionedProtocol {
|
||||||
* Called when a region server first starts
|
* Called when a region server first starts
|
||||||
* @param info
|
* @param info
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
* @return Configuration for the regionserver to use: e.g. filesystem,
|
||||||
|
* hbase rootdir, etc.
|
||||||
*/
|
*/
|
||||||
public void regionServerStartup(HServerInfo info) throws IOException;
|
public MapWritable regionServerStartup(HServerInfo info) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called to renew lease, tell master what the region server is doing and to
|
* Called to renew lease, tell master what the region server is doing and to
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
import java.lang.Thread.UncaughtExceptionHandler;
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.io.MapWritable;
|
import org.apache.hadoop.io.MapWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
|
@ -67,22 +69,12 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
static final Log LOG = LogFactory.getLog(HRegionServer.class);
|
static final Log LOG = LogFactory.getLog(HRegionServer.class);
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
public long getProtocolVersion(final String protocol,
|
|
||||||
@SuppressWarnings("unused") final long clientVersion)
|
|
||||||
throws IOException {
|
|
||||||
if (protocol.equals(HRegionInterface.class.getName())) {
|
|
||||||
return HRegionInterface.versionID;
|
|
||||||
}
|
|
||||||
throw new IOException("Unknown protocol to name node: " + protocol);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set when a report to the master comes back with a message asking us to
|
// Set when a report to the master comes back with a message asking us to
|
||||||
// shutdown. Also set by call to stop when debugging or running unit tests
|
// shutdown. Also set by call to stop when debugging or running unit tests
|
||||||
// of HRegionServer in isolation. We use AtomicBoolean rather than
|
// of HRegionServer in isolation. We use AtomicBoolean rather than
|
||||||
// plain boolean so we can pass a reference to Chore threads. Otherwise,
|
// plain boolean so we can pass a reference to Chore threads. Otherwise,
|
||||||
// Chore threads need to know about the hosting class.
|
// Chore threads need to know about the hosting class.
|
||||||
protected AtomicBoolean stopRequested = new AtomicBoolean(false);
|
protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
// Go down hard. Used if file system becomes unavailable and also in
|
// Go down hard. Used if file system becomes unavailable and also in
|
||||||
// debugging and unit tests.
|
// debugging and unit tests.
|
||||||
|
@ -91,38 +83,35 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
// If false, the file system has become unavailable
|
// If false, the file system has become unavailable
|
||||||
protected volatile boolean fsOk;
|
protected volatile boolean fsOk;
|
||||||
|
|
||||||
final Path rootDir;
|
|
||||||
protected final HServerInfo serverInfo;
|
protected final HServerInfo serverInfo;
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
private final Random rand;
|
private final Random rand = new Random();
|
||||||
|
|
||||||
// region name -> HRegion
|
// region name -> HRegion
|
||||||
protected final SortedMap<Text, HRegion> onlineRegions;
|
protected final SortedMap<Text, HRegion> onlineRegions =
|
||||||
|
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
||||||
protected final Map<Text, HRegion> retiringRegions =
|
protected final Map<Text, HRegion> retiringRegions =
|
||||||
new HashMap<Text, HRegion>();
|
new HashMap<Text, HRegion>();
|
||||||
|
|
||||||
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
private final Vector<HMsg> outboundMsgs;
|
private final Vector<HMsg> outboundMsgs = new Vector<HMsg>();
|
||||||
|
|
||||||
int numRetries;
|
int numRetries;
|
||||||
protected final int threadWakeFrequency;
|
protected final int threadWakeFrequency;
|
||||||
private final int msgInterval;
|
private final int msgInterval;
|
||||||
|
|
||||||
// File paths
|
|
||||||
private FileSystem fs;
|
|
||||||
|
|
||||||
// Remote HMaster
|
// Remote HMaster
|
||||||
private HMasterRegionInterface hbaseMaster;
|
private final HMasterRegionInterface hbaseMaster;
|
||||||
|
|
||||||
// Server to handle client requests. Default access so can be accessed by
|
// Server to handle client requests. Default access so can be accessed by
|
||||||
// unit tests.
|
// unit tests.
|
||||||
Server server;
|
final Server server;
|
||||||
|
|
||||||
// Leases
|
// Leases
|
||||||
private Leases leases;
|
private final Leases leases;
|
||||||
|
|
||||||
// Request counter
|
// Request counter
|
||||||
private AtomicInteger requestCount;
|
private final AtomicInteger requestCount = new AtomicInteger();
|
||||||
|
|
||||||
// A sleeper that sleeps for msgInterval.
|
// A sleeper that sleeps for msgInterval.
|
||||||
private final Sleeper sleeper;
|
private final Sleeper sleeper;
|
||||||
|
@ -134,7 +123,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
// interrupted.
|
// interrupted.
|
||||||
protected final Integer splitOrCompactLock = new Integer(0);
|
protected final Integer splitOrCompactLock = new Integer(0);
|
||||||
|
|
||||||
/**
|
/*
|
||||||
* Runs periodically to determine if regions need to be compacted or split
|
* Runs periodically to determine if regions need to be compacted or split
|
||||||
*/
|
*/
|
||||||
class SplitOrCompactChecker extends Chore
|
class SplitOrCompactChecker extends Chore
|
||||||
|
@ -150,7 +139,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
30 * 1000), stop);
|
30 * 1000), stop);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
public void closing(final Text regionName) {
|
public void closing(final Text regionName) {
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
@ -166,7 +154,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
public void closed(final Text regionName) {
|
public void closed(final Text regionName) {
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
@ -290,7 +277,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
super(period, stop);
|
super(period, stop);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
@Override
|
@Override
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
synchronized(cacheFlusherLock) {
|
synchronized(cacheFlusherLock) {
|
||||||
|
@ -326,8 +312,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HLog and HLog roller.
|
// HLog and HLog roller. log is protected rather than private to avoid
|
||||||
protected final HLog log;
|
// eclipse warning when accessed by inner classes
|
||||||
|
protected HLog log;
|
||||||
private final Thread logRollerThread;
|
private final Thread logRollerThread;
|
||||||
protected final Integer logRollerLock = new Integer(0);
|
protected final Integer logRollerLock = new Integer(0);
|
||||||
|
|
||||||
|
@ -375,32 +362,21 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public HRegionServer(Configuration conf) throws IOException {
|
public HRegionServer(Configuration conf) throws IOException {
|
||||||
this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)),
|
this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
|
||||||
new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
|
DEFAULT_REGIONSERVER_ADDRESS)), conf);
|
||||||
DEFAULT_REGIONSERVER_ADDRESS)),
|
|
||||||
conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a HRegionServer at the specified location
|
* Starts a HRegionServer at the specified location
|
||||||
* @param rootDir
|
|
||||||
* @param address
|
* @param address
|
||||||
* @param conf
|
* @param conf
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public HRegionServer(Path rootDir, HServerAddress address,
|
public HRegionServer(HServerAddress address, Configuration conf)
|
||||||
Configuration conf)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.abortRequested = false;
|
this.abortRequested = false;
|
||||||
this.fsOk = true;
|
this.fsOk = true;
|
||||||
this.rootDir = rootDir;
|
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.rand = new Random();
|
|
||||||
this.onlineRegions =
|
|
||||||
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
|
||||||
|
|
||||||
this.outboundMsgs = new Vector<HMsg>();
|
|
||||||
this.requestCount = new AtomicInteger();
|
|
||||||
|
|
||||||
// Config'ed params
|
// Config'ed params
|
||||||
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
||||||
|
@ -416,112 +392,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
new SplitOrCompactChecker(this.stopRequested);
|
new SplitOrCompactChecker(this.stopRequested);
|
||||||
|
|
||||||
// Task thread to process requests from Master
|
// Task thread to process requests from Master
|
||||||
this.toDo = new LinkedBlockingQueue<ToDoEntry>();
|
|
||||||
this.worker = new Worker();
|
this.worker = new Worker();
|
||||||
this.workerThread = new Thread(worker);
|
this.workerThread = new Thread(worker);
|
||||||
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
|
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
|
||||||
|
this.logRollerThread =
|
||||||
try {
|
new LogRoller(this.threadWakeFrequency, stopRequested);
|
||||||
// Server to handle client requests
|
// Server to handle client requests
|
||||||
this.server = RPC.getServer(this, address.getBindAddress(),
|
this.server = RPC.getServer(this, address.getBindAddress(),
|
||||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||||
false, conf);
|
false, conf);
|
||||||
|
this.serverInfo = new HServerInfo(new HServerAddress(
|
||||||
// Use interface to get the 'real' IP for this host.
|
new InetSocketAddress(getThisIP(),
|
||||||
// 'serverInfo' is sent to master. Should have the real IP of this host
|
this.server.getListenerAddress().getPort())), this.rand.nextLong());
|
||||||
// rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it.
|
this.leases = new Leases(
|
||||||
String realIP = DNS.getDefaultIP(
|
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
||||||
conf.get("dfs.datanode.dns.interface","default"));
|
this.threadWakeFrequency);
|
||||||
this.serverInfo = new HServerInfo(new HServerAddress(
|
// Remote HMaster
|
||||||
new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
|
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
|
||||||
this.rand.nextLong());
|
HMasterRegionInterface.class, HMasterRegionInterface.versionID,
|
||||||
Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" +
|
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
||||||
this.serverInfo.getServerAddress().getPort());
|
conf);
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Log dir " + logdir);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logging
|
|
||||||
this.fs = FileSystem.get(conf);
|
|
||||||
if(fs.exists(logdir)) {
|
|
||||||
throw new RegionServerRunningException("region server already " +
|
|
||||||
"running at " + this.serverInfo.getServerAddress().toString() +
|
|
||||||
" because logdir " + logdir.toString() + " exists");
|
|
||||||
}
|
|
||||||
|
|
||||||
this.log = new HLog(fs, logdir, conf);
|
|
||||||
this.logRollerThread =
|
|
||||||
new LogRoller(this.threadWakeFrequency, stopRequested);
|
|
||||||
|
|
||||||
// Remote HMaster
|
|
||||||
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
|
|
||||||
HMasterRegionInterface.class, HMasterRegionInterface.versionID,
|
|
||||||
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
|
|
||||||
conf);
|
|
||||||
} catch (IOException e) {
|
|
||||||
this.stopRequested.set(true);
|
|
||||||
throw RemoteExceptionHandler.checkIOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @return the HLog */
|
|
||||||
HLog getLog() {
|
|
||||||
return log;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets a flag that will cause all the HRegionServer threads to shut down
|
|
||||||
* in an orderly fashion. Used by unit tests and called by {@link Flusher}
|
|
||||||
* if it judges server needs to be restarted.
|
|
||||||
*/
|
|
||||||
synchronized void stop() {
|
|
||||||
this.stopRequested.set(true);
|
|
||||||
notifyAll(); // Wakes run() if it is sleeping
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cause the server to exit without closing the regions it is serving, the
|
|
||||||
* log it is using and without notifying the master.
|
|
||||||
* Used unit testing and on catastrophic events such as HDFS is yanked out
|
|
||||||
* from under hbase or we OOME.
|
|
||||||
*/
|
|
||||||
synchronized void abort() {
|
|
||||||
this.abortRequested = true;
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait on all threads to finish.
|
|
||||||
* Presumption is that all closes and stops have already been called.
|
|
||||||
*/
|
|
||||||
void join() {
|
|
||||||
try {
|
|
||||||
this.workerThread.join();
|
|
||||||
} catch(InterruptedException iex) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
this.logRollerThread.join();
|
|
||||||
} catch(InterruptedException iex) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
this.cacheFlusherThread.join();
|
|
||||||
} catch(InterruptedException iex) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
this.splitOrCompactCheckerThread.join();
|
|
||||||
} catch(InterruptedException iex) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
this.server.join();
|
|
||||||
} catch(InterruptedException iex) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
LOG.info("HRegionServer stopped at: " +
|
|
||||||
serverInfo.getServerAddress().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -530,21 +420,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
* load/unload instructions.
|
* load/unload instructions.
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
startAllServices();
|
|
||||||
|
|
||||||
// Set below if HMaster asked us stop.
|
// Set below if HMaster asked us stop.
|
||||||
boolean masterRequestedStop = false;
|
boolean masterRequestedStop = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
init(reportForDuty());
|
||||||
while(!stopRequested.get()) {
|
while(!stopRequested.get()) {
|
||||||
long lastMsg = 0;
|
long lastMsg = 0;
|
||||||
try {
|
|
||||||
reportForDuty();
|
|
||||||
} catch(IOException e) {
|
|
||||||
this.sleeper.sleep(lastMsg);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now ask master what it wants us to do and tell it what we have done
|
// Now ask master what it wants us to do and tell it what we have done
|
||||||
for (int tries = 0; !stopRequested.get();) {
|
for (int tries = 0; !stopRequested.get();) {
|
||||||
if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
|
if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
|
||||||
|
@ -630,7 +512,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
LOG.fatal("Unhandled exception. Aborting...", t);
|
LOG.fatal("Unhandled exception. Aborting...", t);
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
leases.closeAfterLeasesExpire();
|
this.leases.closeAfterLeasesExpire();
|
||||||
this.worker.stop();
|
this.worker.stop();
|
||||||
this.server.stop();
|
this.server.stop();
|
||||||
|
|
||||||
|
@ -691,10 +573,52 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
serverInfo.getServerAddress().toString());
|
serverInfo.getServerAddress().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
join();
|
join();
|
||||||
LOG.info(Thread.currentThread().getName() + " exiting");
|
LOG.info(Thread.currentThread().getName() + " exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Run init. Sets up hlog and starts up all server threads.
|
||||||
|
* @param c Extra configuration.
|
||||||
|
*/
|
||||||
|
private void init(final MapWritable c) {
|
||||||
|
try {
|
||||||
|
for (Map.Entry<Writable, Writable> e: c.entrySet()) {
|
||||||
|
String key = e.getKey().toString();
|
||||||
|
String value = e.getValue().toString();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Config from master: " + key + "=" + value);
|
||||||
|
}
|
||||||
|
this.conf.set(key, value);
|
||||||
|
}
|
||||||
|
this.log = setupHLog();
|
||||||
|
startServiceThreads();
|
||||||
|
} catch (IOException e) {
|
||||||
|
this.stopRequested.set(true);
|
||||||
|
LOG.fatal("Failed init",
|
||||||
|
RemoteExceptionHandler.checkIOException(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HLog setupHLog()
|
||||||
|
throws RegionServerRunningException, IOException {
|
||||||
|
String rootDir = this.conf.get(HConstants.HBASE_DIR);
|
||||||
|
LOG.info("Root dir: " + rootDir);
|
||||||
|
Path logdir = new Path(new Path(rootDir),
|
||||||
|
"log" + "_" + getThisIP() + "_" +
|
||||||
|
this.serverInfo.getServerAddress().getPort());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Log dir " + logdir);
|
||||||
|
}
|
||||||
|
FileSystem fs = FileSystem.get(this.conf);
|
||||||
|
if (fs.exists(logdir)) {
|
||||||
|
throw new RegionServerRunningException("region server already " +
|
||||||
|
"running at " + this.serverInfo.getServerAddress().toString() +
|
||||||
|
" because logdir " + logdir.toString() + " exists");
|
||||||
|
}
|
||||||
|
return new HLog(fs, logdir, conf);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Start Chore Threads, Server, Worker and lease checker threads. Install an
|
* Start Chore Threads, Server, Worker and lease checker threads. Install an
|
||||||
* UncaughtExceptionHandler that calls abort of RegionServer if we get
|
* UncaughtExceptionHandler that calls abort of RegionServer if we get
|
||||||
|
@ -707,7 +631,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
* Chore, it keeps its own internal stop mechanism so needs to be stopped
|
* Chore, it keeps its own internal stop mechanism so needs to be stopped
|
||||||
* by this hosting server. Worker logs the exception and exits.
|
* by this hosting server. Worker logs the exception and exits.
|
||||||
*/
|
*/
|
||||||
private void startAllServices() {
|
private void startServiceThreads() throws IOException {
|
||||||
String n = Thread.currentThread().getName();
|
String n = Thread.currentThread().getName();
|
||||||
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
|
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
|
||||||
public void uncaughtException(Thread t, Throwable e) {
|
public void uncaughtException(Thread t, Throwable e) {
|
||||||
|
@ -728,41 +652,105 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
|
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
|
||||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||||
// an unhandled exception, it will just exit.
|
// an unhandled exception, it will just exit.
|
||||||
this.leases = new Leases(
|
|
||||||
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
|
||||||
this.threadWakeFrequency);
|
|
||||||
this.leases.setName(n + ".leaseChecker");
|
this.leases.setName(n + ".leaseChecker");
|
||||||
this.leases.start();
|
this.leases.start();
|
||||||
// Start Server. This service is like leases in that it internally runs
|
// Start Server. This service is like leases in that it internally runs
|
||||||
// a thread.
|
// a thread.
|
||||||
try {
|
this.server.start();
|
||||||
this.server.start();
|
LOG.info("HRegionServer started at: " +
|
||||||
LOG.info("HRegionServer started at: " +
|
|
||||||
serverInfo.getServerAddress().toString());
|
serverInfo.getServerAddress().toString());
|
||||||
} catch(IOException e) {
|
}
|
||||||
this.stopRequested.set(true);
|
|
||||||
LOG.fatal("Failed start Server",
|
/** @return the HLog */
|
||||||
RemoteExceptionHandler.checkIOException(e));
|
HLog getLog() {
|
||||||
|
return this.log;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use interface to get the 'real' IP for this host. 'serverInfo' is sent to
|
||||||
|
* master. Should have the real IP of this host rather than 'localhost' or
|
||||||
|
* 0.0.0.0 or 127.0.0.1 in it.
|
||||||
|
* @return This servers' IP.
|
||||||
|
*/
|
||||||
|
private String getThisIP() throws UnknownHostException {
|
||||||
|
return DNS.getDefaultIP(conf.get("dfs.datanode.dns.interface","default"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets a flag that will cause all the HRegionServer threads to shut down
|
||||||
|
* in an orderly fashion. Used by unit tests and called by {@link Flusher}
|
||||||
|
* if it judges server needs to be restarted.
|
||||||
|
*/
|
||||||
|
synchronized void stop() {
|
||||||
|
this.stopRequested.set(true);
|
||||||
|
notifyAll(); // Wakes run() if it is sleeping
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cause the server to exit without closing the regions it is serving, the
|
||||||
|
* log it is using and without notifying the master.
|
||||||
|
* Used unit testing and on catastrophic events such as HDFS is yanked out
|
||||||
|
* from under hbase or we OOME.
|
||||||
|
*/
|
||||||
|
synchronized void abort() {
|
||||||
|
this.abortRequested = true;
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait on all threads to finish.
|
||||||
|
* Presumption is that all closes and stops have already been called.
|
||||||
|
*/
|
||||||
|
void join() {
|
||||||
|
join(this.workerThread);
|
||||||
|
join(this.logRollerThread);
|
||||||
|
join(this.cacheFlusherThread);
|
||||||
|
join(this.splitOrCompactCheckerThread);
|
||||||
|
try {
|
||||||
|
this.server.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// No means of asking server if its done... .so just assume it is even
|
||||||
|
// if an interrupt.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void join(final Thread t) {
|
||||||
|
while (t.isAlive()) {
|
||||||
|
try {
|
||||||
|
t.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Let the master know we're here
|
* Let the master know we're here
|
||||||
* @throws IOException
|
* Run initialization using parameters passed us by the master.
|
||||||
*/
|
*/
|
||||||
private void reportForDuty() throws IOException {
|
private MapWritable reportForDuty() {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Telling master we are up");
|
LOG.debug("Telling master we are up");
|
||||||
}
|
}
|
||||||
this.requestCount.set(0);
|
MapWritable result = null;
|
||||||
this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
while(!stopRequested.get()) {
|
||||||
this.hbaseMaster.regionServerStartup(serverInfo);
|
long lastMsg = 0;
|
||||||
if (LOG.isDebugEnabled()) {
|
try {
|
||||||
LOG.debug("Done telling master we are up");
|
this.requestCount.set(0);
|
||||||
|
this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
||||||
|
result = this.hbaseMaster.regionServerStartup(serverInfo);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Done telling master we are up");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} catch(IOException e) {
|
||||||
|
this.sleeper.sleep(lastMsg);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Add to the outbound message buffer */
|
/** Add to the outbound message buffer */
|
||||||
private void reportOpen(HRegion region) {
|
private void reportOpen(HRegion region) {
|
||||||
synchronized(outboundMsgs) {
|
synchronized(outboundMsgs) {
|
||||||
|
@ -808,7 +796,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
this.msg = msg;
|
this.msg = msg;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
BlockingQueue<ToDoEntry> toDo;
|
BlockingQueue<ToDoEntry> toDo = new LinkedBlockingQueue<ToDoEntry>();
|
||||||
private Worker worker;
|
private Worker worker;
|
||||||
private Thread workerThread;
|
private Thread workerThread;
|
||||||
|
|
||||||
|
@ -886,7 +874,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
void openRegion(HRegionInfo regionInfo) throws IOException {
|
void openRegion(HRegionInfo regionInfo) throws IOException {
|
||||||
HRegion region = onlineRegions.get(regionInfo.regionName);
|
HRegion region = onlineRegions.get(regionInfo.regionName);
|
||||||
if(region == null) {
|
if(region == null) {
|
||||||
region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
|
region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
|
||||||
|
this.log, FileSystem.get(conf), conf, regionInfo, null);
|
||||||
this.lock.writeLock().lock();
|
this.lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
this.log.setSequenceNumber(region.getMaxSequenceId());
|
this.log.setSequenceNumber(region.getMaxSequenceId());
|
||||||
|
@ -1275,7 +1264,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
*/
|
*/
|
||||||
protected boolean checkFileSystem() {
|
protected boolean checkFileSystem() {
|
||||||
if (this.fsOk) {
|
if (this.fsOk) {
|
||||||
if (!FSUtils.isFileSystemAvailable(fs)) {
|
FileSystem fs = null;
|
||||||
|
try {
|
||||||
|
fs = FileSystem.get(this.conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed get of filesystem", e);
|
||||||
|
}
|
||||||
|
if (fs != null && !FSUtils.isFileSystemAvailable(fs, stopRequested)) {
|
||||||
LOG.fatal("Shutting down HRegionServer: file system not available");
|
LOG.fatal("Shutting down HRegionServer: file system not available");
|
||||||
this.abortRequested = true;
|
this.abortRequested = true;
|
||||||
this.stopRequested.set(true);
|
this.stopRequested.set(true);
|
||||||
|
@ -1308,6 +1303,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
return regionsToCheck;
|
return regionsToCheck;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getProtocolVersion(final String protocol,
|
||||||
|
@SuppressWarnings("unused") final long clientVersion)
|
||||||
|
throws IOException {
|
||||||
|
if (protocol.equals(HRegionInterface.class.getName())) {
|
||||||
|
return HRegionInterface.versionID;
|
||||||
|
}
|
||||||
|
throw new IOException("Unknown protocol to name node: " + protocol);
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Main program and support routines
|
// Main program and support routines
|
||||||
//
|
//
|
||||||
|
|
|
@ -20,11 +20,13 @@
|
||||||
package org.apache.hadoop.hbase.util;
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.dfs.DistributedFileSystem;
|
import org.apache.hadoop.dfs.DistributedFileSystem;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -36,23 +38,28 @@ public class FSUtils {
|
||||||
/**
|
/**
|
||||||
* Not instantiable
|
* Not instantiable
|
||||||
*/
|
*/
|
||||||
private FSUtils() {}
|
private FSUtils() {super();}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks to see if the specified file system is available
|
* Checks to see if the specified file system is available
|
||||||
*
|
*
|
||||||
* @param fs
|
* @param fs
|
||||||
|
* @param closed Optional flag. If non-null and set, will abort test of
|
||||||
|
* filesytem. Presumption is a flag shared by multiple threads. Another
|
||||||
|
* may have already determined the filesystem -- or something else -- bad.
|
||||||
* @return true if the specified file system is available.
|
* @return true if the specified file system is available.
|
||||||
*/
|
*/
|
||||||
public static boolean isFileSystemAvailable(FileSystem fs) {
|
public static boolean isFileSystemAvailable(final FileSystem fs,
|
||||||
|
final AtomicBoolean closed) {
|
||||||
if (!(fs instanceof DistributedFileSystem)) {
|
if (!(fs instanceof DistributedFileSystem)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
boolean available = false;
|
boolean available = false;
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
int maxTries = dfs.getConf().getInt("hbase.client.retries.number", 3);
|
int maxTries = dfs.getConf().getInt("hbase.client.retries.number", 3);
|
||||||
Path root = new Path(dfs.getConf().get("hbase.dir", "/"));
|
Path root =
|
||||||
for (int i = 0; i < maxTries; i++) {
|
fs.makeQualified(new Path(dfs.getConf().get(HConstants.HBASE_DIR, "/")));
|
||||||
|
for (int i = 0; i < maxTries && (closed == null || !closed.get()); i++) {
|
||||||
IOException ex = null;
|
IOException ex = null;
|
||||||
try {
|
try {
|
||||||
if (dfs.exists(root)) {
|
if (dfs.exists(root)) {
|
||||||
|
@ -62,12 +69,10 @@ public class FSUtils {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
ex = e;
|
ex = e;
|
||||||
}
|
}
|
||||||
String exception = "";
|
String exception = (ex == null)? "": ": " + ex.getMessage();
|
||||||
if (ex != null) {
|
LOG.info("Failed exists test on " + root + " by thread " +
|
||||||
exception = ": " + ex.getMessage();
|
Thread.currentThread().getName() + " (Attempt " + i + " of " +
|
||||||
}
|
maxTries +"): " + exception);
|
||||||
LOG.info("Failed exists test on " + root + " (Attempt " + i + ")" +
|
|
||||||
exception);
|
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (!available) {
|
if (!available) {
|
||||||
|
|
|
@ -63,10 +63,10 @@ public class MultiRegionTable extends HBaseTestCase {
|
||||||
HConstants.DEFAULT_MAX_FILE_SIZE) <= 1024 * 1024);
|
HConstants.DEFAULT_MAX_FILE_SIZE) <= 1024 * 1024);
|
||||||
|
|
||||||
final int retries = 10;
|
final int retries = 10;
|
||||||
Path d = cluster.regionThreads.get(0).getRegionServer().rootDir;
|
|
||||||
FileSystem fs = (cluster.getDFSCluster() == null) ?
|
FileSystem fs = (cluster.getDFSCluster() == null) ?
|
||||||
localFs : cluster.getDFSCluster().getFileSystem();
|
localFs : cluster.getDFSCluster().getFileSystem();
|
||||||
assertNotNull(fs);
|
assertNotNull(fs);
|
||||||
|
Path d = fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR)));
|
||||||
|
|
||||||
// Get connection on the meta table and get count of rows.
|
// Get connection on the meta table and get count of rows.
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
|
||||||
|
@ -42,9 +41,9 @@ public class OOMERegionServer extends HRegionServer {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OOMERegionServer(Path rootDir, HServerAddress address,
|
public OOMERegionServer(HServerAddress address, Configuration conf)
|
||||||
Configuration conf) throws IOException {
|
throws IOException {
|
||||||
super(rootDir, address, conf);
|
super(address, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,8 +33,6 @@ public class TestDFSAbort extends HBaseClusterTestCase {
|
||||||
/** constructor */
|
/** constructor */
|
||||||
public TestDFSAbort() {
|
public TestDFSAbort() {
|
||||||
super();
|
super();
|
||||||
// conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
|
|
||||||
// conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
|
|
||||||
Logger.getRootLogger().setLevel(Level.WARN);
|
Logger.getRootLogger().setLevel(Level.WARN);
|
||||||
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
|
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
|
||||||
}
|
}
|
||||||
|
@ -66,4 +64,4 @@ public class TestDFSAbort extends HBaseClusterTestCase {
|
||||||
public static void main(@SuppressWarnings("unused") String[] args) {
|
public static void main(@SuppressWarnings("unused") String[] args) {
|
||||||
TestRunner.run(new TestSuite(TestDFSAbort.class));
|
TestRunner.run(new TestSuite(TestDFSAbort.class));
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue