HBASE-501 Empty region server address in info:server entry and a startcode of -1 in .META.
M conf/hbase-default.xml Add hbase.hbasemaster.maxregionopen property. M src/java/org/apache/hadoop/hbase/HStore.java Change way we log. Do way less. Just emit sums of edits applied and skipped rather than individual edits. M src/java/org/apache/hadoop/hbase/HRegionServer.java Make sleeper instance a local rather than data member. (reportForDuty): Take a sleeper instance. (run): Removed redundant wrap of a 'for' by a 'while'. (constructor): If IOE, do not offline the region. Seen to be an overreaction. M src/java/org/apache/hadoop/hbase/HLog.java Don't output map of all files being cleaned everytime a new entry is added; instead just log new entry. Remove emission of every 10k edits. M src/java/org/apache/hadoop/hbase/HMaster.java Up default for maxregionopen. Was seeing that playing edits could take a long time (mostly because we used log every edit) but no harm in this being longer. On REPORT_CLOSE, emit region info, not just region so can see the properties (W/o, made it hard to figure who was responsible for offlining). Add logging of attempt # in shutdown processing. Add logging of state flags passed to the close region. Helps debugging. Also in close offline ONLY if we are NOT reassigning the region (jimk find). M src/java/org/apache/hadoop/hbase/util/Sleeper.java Add logging of extraordinary sleeps or calculated periods (suspicion is that we're sleeping way longer on loaded machies and the regionserver appears hung). git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@636849 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8c815c72f4
commit
c1f974bbee
|
@ -38,6 +38,8 @@ Hbase Change Log
|
||||||
HBASE-433 HBASE-251 Region server should delete restore log after successful
|
HBASE-433 HBASE-251 Region server should delete restore log after successful
|
||||||
restore, Stuck replaying the edits of crashed machine.
|
restore, Stuck replaying the edits of crashed machine.
|
||||||
HBASE-27 hregioninfo cell empty in meta table
|
HBASE-27 hregioninfo cell empty in meta table
|
||||||
|
HBASE-501 Empty region server address in info:server entry and a
|
||||||
|
startcode of -1 in .META.
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-415 Rewrite leases to use DelayedBlockingQueue instead of polling
|
HBASE-415 Rewrite leases to use DelayedBlockingQueue instead of polling
|
||||||
|
|
|
@ -107,6 +107,13 @@
|
||||||
they are considered dead. On loaded cluster, may need to up this
|
they are considered dead. On loaded cluster, may need to up this
|
||||||
period.</description>
|
period.</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.hbasemaster.maxregionopen</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>Period to wait for a region open. If regionserver
|
||||||
|
takes longer than this interval, assign to a new regionserver.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.regionserver.lease.period</name>
|
<name>hbase.regionserver.lease.period</name>
|
||||||
<value>30000</value>
|
<value>30000</value>
|
||||||
|
|
|
@ -229,7 +229,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||||
this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||||
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
||||||
this.maxRegionOpenTime =
|
this.maxRegionOpenTime =
|
||||||
conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
|
conf.getLong("hbase.hbasemaster.maxregionopen", 60 * 1000);
|
||||||
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
|
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
|
||||||
|
|
||||||
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
|
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
|
||||||
|
@ -589,6 +589,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||||
|
|
||||||
private void createTable(final HRegionInfo newRegion) throws IOException {
|
private void createTable(final HRegionInfo newRegion) throws IOException {
|
||||||
Text tableName = newRegion.getTableDesc().getName();
|
Text tableName = newRegion.getTableDesc().getName();
|
||||||
|
// TODO: Not thread safe check.
|
||||||
if (tableInCreation.contains(tableName)) {
|
if (tableInCreation.contains(tableName)) {
|
||||||
throw new TableExistsException("Table " + tableName + " in process "
|
throw new TableExistsException("Table " + tableName + " in process "
|
||||||
+ "of being created");
|
+ "of being created");
|
||||||
|
|
|
@ -49,7 +49,8 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ProcessRegionClose of " + this.regionInfo.getRegionName();
|
return "ProcessRegionClose of " + this.regionInfo.getRegionName() +
|
||||||
|
", " + this.reassignRegion + ", " + this.deleteRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -74,7 +75,7 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||||
if (deleteRegion) {
|
if (deleteRegion) {
|
||||||
HRegion.removeRegionFromMETA(getMetaServer(), metaRegionName,
|
HRegion.removeRegionFromMETA(getMetaServer(), metaRegionName,
|
||||||
regionInfo.getRegionName());
|
regionInfo.getRegionName());
|
||||||
} else {
|
} else if (!this.reassignRegion) {
|
||||||
HRegion.offlineRegionInMETA(getMetaServer(), metaRegionName,
|
HRegion.offlineRegionInMETA(getMetaServer(), metaRegionName,
|
||||||
regionInfo);
|
regionInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,7 +295,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("process server shutdown scanning " +
|
LOG.debug("process server shutdown scanning " +
|
||||||
r.getRegionName() + " on " + r.getServer() + " " +
|
r.getRegionName() + " on " + r.getServer() + " " +
|
||||||
Thread.currentThread().getName());
|
Thread.currentThread().getName() + " attempt " + tries);
|
||||||
}
|
}
|
||||||
server = master.connection.getHRegionConnection(r.getServer());
|
server = master.connection.getHRegionConnection(r.getServer());
|
||||||
|
|
||||||
|
|
|
@ -315,7 +315,7 @@ class ServerManager implements HConstants {
|
||||||
|
|
||||||
case HMsg.MSG_REPORT_CLOSE:
|
case HMsg.MSG_REPORT_CLOSE:
|
||||||
LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
|
LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
|
||||||
region.getRegionName());
|
region);
|
||||||
|
|
||||||
if (region.isRootRegion()) {
|
if (region.isRootRegion()) {
|
||||||
// Root region
|
// Root region
|
||||||
|
|
|
@ -546,7 +546,6 @@ public class HLog implements HConstants {
|
||||||
),
|
),
|
||||||
HREGION_OLDLOGFILE_NAME
|
HREGION_OLDLOGFILE_NAME
|
||||||
);
|
);
|
||||||
|
|
||||||
Path oldlogfile = null;
|
Path oldlogfile = null;
|
||||||
SequenceFile.Reader old = null;
|
SequenceFile.Reader old = null;
|
||||||
if (fs.exists(logfile)) {
|
if (fs.exists(logfile)) {
|
||||||
|
@ -556,16 +555,15 @@ public class HLog implements HConstants {
|
||||||
fs.rename(logfile, oldlogfile);
|
fs.rename(logfile, oldlogfile);
|
||||||
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Creating new log file writer for path " + logfile +
|
|
||||||
"; map content " + logWriters.toString());
|
|
||||||
}
|
|
||||||
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
|
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
|
||||||
HLogEdit.class, getCompressionType(conf));
|
HLogEdit.class, getCompressionType(conf));
|
||||||
// Use copy of regionName; regionName object is reused inside in
|
// Use copy of regionName; regionName object is reused inside in
|
||||||
// HStoreKey.getRegionName so its content changes as we iterate.
|
// HStoreKey.getRegionName so its content changes as we iterate.
|
||||||
logWriters.put(new Text(regionName), w);
|
logWriters.put(new Text(regionName), w);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Creating new log file writer for path " + logfile +
|
||||||
|
" and region " + regionName);
|
||||||
|
}
|
||||||
|
|
||||||
if (old != null) {
|
if (old != null) {
|
||||||
// Copy from existing log file
|
// Copy from existing log file
|
||||||
|
@ -581,9 +579,6 @@ public class HLog implements HConstants {
|
||||||
fs.delete(oldlogfile);
|
fs.delete(oldlogfile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) {
|
|
||||||
LOG.debug("Applied " + count + " edits");
|
|
||||||
}
|
|
||||||
w.append(key, val);
|
w.append(key, val);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
new ConcurrentHashMap<Text, HRegion>();
|
new ConcurrentHashMap<Text, HRegion>();
|
||||||
|
|
||||||
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
private volatile List<HMsg> outboundMsgs =
|
private final List<HMsg> outboundMsgs =
|
||||||
Collections.synchronizedList(new ArrayList<HMsg>());
|
Collections.synchronizedList(new ArrayList<HMsg>());
|
||||||
|
|
||||||
final int numRetries;
|
final int numRetries;
|
||||||
|
@ -142,9 +142,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
// Request counter
|
// Request counter
|
||||||
private volatile AtomicInteger requestCount = new AtomicInteger();
|
private volatile AtomicInteger requestCount = new AtomicInteger();
|
||||||
|
|
||||||
// A sleeper that sleeps for msgInterval.
|
|
||||||
private final Sleeper sleeper;
|
|
||||||
|
|
||||||
// Info server. Default access so can be used by unit tests. REGIONSERVER
|
// Info server. Default access so can be used by unit tests. REGIONSERVER
|
||||||
// is name of the webapp and the attribute name used stuffing this instance
|
// is name of the webapp and the attribute name used stuffing this instance
|
||||||
// into web context.
|
// into web context.
|
||||||
|
@ -234,7 +231,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
// Task thread to process requests from Master
|
// Task thread to process requests from Master
|
||||||
this.worker = new Worker();
|
this.worker = new Worker();
|
||||||
this.workerThread = new Thread(worker);
|
this.workerThread = new Thread(worker);
|
||||||
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
|
|
||||||
// Server to handle client requests
|
// Server to handle client requests
|
||||||
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
|
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
|
||||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||||
|
@ -259,145 +256,146 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean quiesceRequested = false;
|
boolean quiesceRequested = false;
|
||||||
|
// A sleeper that sleeps for msgInterval.
|
||||||
|
Sleeper sleeper =
|
||||||
|
new Sleeper(this.msgInterval, this.stopRequested);
|
||||||
try {
|
try {
|
||||||
init(reportForDuty());
|
init(reportForDuty(sleeper));
|
||||||
long lastMsg = 0;
|
long lastMsg = 0;
|
||||||
while(!stopRequested.get()) {
|
// Now ask master what it wants us to do and tell it what we have done
|
||||||
// Now ask master what it wants us to do and tell it what we have done
|
for (int tries = 0; !stopRequested.get();) {
|
||||||
for (int tries = 0; !stopRequested.get();) {
|
long now = System.currentTimeMillis();
|
||||||
long now = System.currentTimeMillis();
|
if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
|
||||||
if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
|
// It has been way too long since we last reported to the master.
|
||||||
// It has been way too long since we last reported to the master.
|
// Commit suicide.
|
||||||
// Commit suicide.
|
LOG.fatal("unable to report to master for " + (now - lastMsg) +
|
||||||
LOG.fatal("unable to report to master for " + (now - lastMsg) +
|
" milliseconds - aborting server");
|
||||||
" milliseconds - aborting server");
|
abort();
|
||||||
abort();
|
break;
|
||||||
break;
|
}
|
||||||
}
|
if ((now - lastMsg) >= msgInterval) {
|
||||||
if ((now - lastMsg) >= msgInterval) {
|
HMsg outboundArray[] = null;
|
||||||
HMsg outboundArray[] = null;
|
synchronized(this.outboundMsgs) {
|
||||||
synchronized(outboundMsgs) {
|
outboundArray =
|
||||||
outboundArray =
|
this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
|
||||||
this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
|
|
||||||
}
|
|
||||||
this.outboundMsgs.clear();
|
this.outboundMsgs.clear();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
|
this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
|
||||||
onlineRegions.size()));
|
onlineRegions.size()));
|
||||||
this.requestCount.set(0);
|
this.requestCount.set(0);
|
||||||
HMsg msgs[] =
|
HMsg msgs[] =
|
||||||
this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
|
this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
|
||||||
lastMsg = System.currentTimeMillis();
|
lastMsg = System.currentTimeMillis();
|
||||||
|
|
||||||
if (this.quiesced.get() && onlineRegions.size() == 0) {
|
if (this.quiesced.get() && onlineRegions.size() == 0) {
|
||||||
// We've just told the master we're exiting because we aren't
|
// We've just told the master we're exiting because we aren't
|
||||||
// serving any regions. So set the stop bit and exit.
|
// serving any regions. So set the stop bit and exit.
|
||||||
LOG.info("Server quiesced and not serving any regions. " +
|
LOG.info("Server quiesced and not serving any regions. " +
|
||||||
"Starting shutdown");
|
"Starting shutdown");
|
||||||
|
stopRequested.set(true);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue up the HMaster's instruction stream for processing
|
||||||
|
boolean restart = false;
|
||||||
|
for(int i = 0; i < msgs.length && !stopRequested.get() &&
|
||||||
|
!restart; i++) {
|
||||||
|
switch(msgs[i].getMsg()) {
|
||||||
|
|
||||||
|
case HMsg.MSG_CALL_SERVER_STARTUP:
|
||||||
|
LOG.info("Got call server startup message");
|
||||||
|
// We the MSG_CALL_SERVER_STARTUP on startup but we can also
|
||||||
|
// get it when the master is panicing because for instance
|
||||||
|
// the HDFS has been yanked out from under it. Be wary of
|
||||||
|
// this message.
|
||||||
|
if (checkFileSystem()) {
|
||||||
|
closeAllRegions();
|
||||||
|
synchronized (logRollerLock) {
|
||||||
|
try {
|
||||||
|
log.closeAndDelete();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("error closing and deleting HLog", e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
serverInfo.setStartCode(System.currentTimeMillis());
|
||||||
|
log = setupHLog();
|
||||||
|
} catch (IOException e) {
|
||||||
|
this.abortRequested = true;
|
||||||
|
this.stopRequested.set(true);
|
||||||
|
e = RemoteExceptionHandler.checkIOException(e);
|
||||||
|
LOG.fatal("error restarting server", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reportForDuty(sleeper);
|
||||||
|
restart = true;
|
||||||
|
} else {
|
||||||
|
LOG.fatal("file system available check failed. " +
|
||||||
|
"Shutting down server.");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case HMsg.MSG_REGIONSERVER_STOP:
|
||||||
|
LOG.info("Got regionserver stop message");
|
||||||
stopRequested.set(true);
|
stopRequested.set(true);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case HMsg.MSG_REGIONSERVER_QUIESCE:
|
||||||
|
if (!quiesceRequested) {
|
||||||
|
LOG.info("Got quiesce server message");
|
||||||
|
try {
|
||||||
|
toDo.put(new ToDoEntry(msgs[i]));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException("Putting into msgQueue was " +
|
||||||
|
"interrupted.", e);
|
||||||
|
}
|
||||||
|
quiesceRequested = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
if (fsOk) {
|
||||||
|
try {
|
||||||
|
toDo.put(new ToDoEntry(msgs[i]));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException("Putting into msgQueue was " +
|
||||||
|
"interrupted.", e);
|
||||||
|
}
|
||||||
|
if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) {
|
||||||
|
this.outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN,
|
||||||
|
msgs[i].getRegionInfo()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (restart || this.stopRequested.get()) {
|
||||||
|
toDo.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Reset tries count if we had a successful transaction.
|
||||||
|
tries = 0;
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e instanceof IOException) {
|
||||||
|
e = RemoteExceptionHandler.checkIOException((IOException) e);
|
||||||
|
}
|
||||||
|
if (tries < this.numRetries) {
|
||||||
|
LOG.warn("Processing message (Retry: " + tries + ")", e);
|
||||||
|
tries++;
|
||||||
|
} else {
|
||||||
|
LOG.fatal("Exceeded max retries: " + this.numRetries, e);
|
||||||
|
if (!checkFileSystem()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
// Something seriously wrong. Shutdown.
|
||||||
// Queue up the HMaster's instruction stream for processing
|
stop();
|
||||||
boolean restart = false;
|
|
||||||
for(int i = 0; i < msgs.length && !stopRequested.get() &&
|
|
||||||
!restart; i++) {
|
|
||||||
switch(msgs[i].getMsg()) {
|
|
||||||
|
|
||||||
case HMsg.MSG_CALL_SERVER_STARTUP:
|
|
||||||
LOG.info("Got call server startup message");
|
|
||||||
// We the MSG_CALL_SERVER_STARTUP on startup but we can also
|
|
||||||
// get it when the master is panicing because for instance
|
|
||||||
// the HDFS has been yanked out from under it. Be wary of
|
|
||||||
// this message.
|
|
||||||
if (checkFileSystem()) {
|
|
||||||
closeAllRegions();
|
|
||||||
synchronized (logRollerLock) {
|
|
||||||
try {
|
|
||||||
log.closeAndDelete();
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("error closing and deleting HLog", e);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
serverInfo.setStartCode(System.currentTimeMillis());
|
|
||||||
log = setupHLog();
|
|
||||||
} catch (IOException e) {
|
|
||||||
this.abortRequested = true;
|
|
||||||
this.stopRequested.set(true);
|
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
|
||||||
LOG.fatal("error restarting server", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
reportForDuty();
|
|
||||||
restart = true;
|
|
||||||
} else {
|
|
||||||
LOG.fatal("file system available check failed. " +
|
|
||||||
"Shutting down server.");
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case HMsg.MSG_REGIONSERVER_STOP:
|
|
||||||
LOG.info("Got regionserver stop message");
|
|
||||||
stopRequested.set(true);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case HMsg.MSG_REGIONSERVER_QUIESCE:
|
|
||||||
if (!quiesceRequested) {
|
|
||||||
LOG.info("Got quiesce server message");
|
|
||||||
try {
|
|
||||||
toDo.put(new ToDoEntry(msgs[i]));
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException("Putting into msgQueue was " +
|
|
||||||
"interrupted.", e);
|
|
||||||
}
|
|
||||||
quiesceRequested = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
if (fsOk) {
|
|
||||||
try {
|
|
||||||
toDo.put(new ToDoEntry(msgs[i]));
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException("Putting into msgQueue was " +
|
|
||||||
"interrupted.", e);
|
|
||||||
}
|
|
||||||
if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) {
|
|
||||||
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN,
|
|
||||||
msgs[i].getRegionInfo()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (restart || this.stopRequested.get()) {
|
|
||||||
toDo.clear();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// Reset tries count if we had a successful transaction.
|
|
||||||
tries = 0;
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (e instanceof IOException) {
|
|
||||||
e = RemoteExceptionHandler.checkIOException((IOException) e);
|
|
||||||
}
|
|
||||||
if(tries < this.numRetries) {
|
|
||||||
LOG.warn("Processing message (Retry: " + tries + ")", e);
|
|
||||||
tries++;
|
|
||||||
} else {
|
|
||||||
LOG.fatal("Exceeded max retries: " + this.numRetries, e);
|
|
||||||
if (!checkFileSystem()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Something seriously wrong. Shutdown.
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.sleeper.sleep(lastMsg);
|
}
|
||||||
} // for
|
sleeper.sleep(lastMsg);
|
||||||
} // while (!stopRequested.get())
|
} // for
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.fatal("Unhandled exception. Aborting...", t);
|
LOG.fatal("Unhandled exception. Aborting...", t);
|
||||||
abort();
|
abort();
|
||||||
|
@ -627,7 +625,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
* Let the master know we're here
|
* Let the master know we're here
|
||||||
* Run initialization using parameters passed us by the master.
|
* Run initialization using parameters passed us by the master.
|
||||||
*/
|
*/
|
||||||
private HbaseMapWritable reportForDuty() throws IOException {
|
private HbaseMapWritable reportForDuty(final Sleeper sleeper)
|
||||||
|
throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Telling master at " +
|
LOG.debug("Telling master at " +
|
||||||
conf.get(MASTER_ADDRESS) + " that we are up");
|
conf.get(MASTER_ADDRESS) + " that we are up");
|
||||||
|
@ -651,7 +650,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
break;
|
break;
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.warn("error telling master we are up", e);
|
LOG.warn("error telling master we are up", e);
|
||||||
this.sleeper.sleep(lastMsg);
|
sleeper.sleep(lastMsg);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -794,12 +793,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("error opening region " + regionInfo.getRegionName(), e);
|
LOG.error("error opening region " + regionInfo.getRegionName(), e);
|
||||||
|
|
||||||
// Mark the region offline.
|
|
||||||
// TODO: add an extra field in HRegionInfo to indicate that there is
|
// TODO: add an extra field in HRegionInfo to indicate that there is
|
||||||
// an error. We can't do that now because that would be an incompatible
|
// an error. We can't do that now because that would be an incompatible
|
||||||
// change that would require a migration
|
// change that would require a migration
|
||||||
|
|
||||||
regionInfo.setOffline(true);
|
|
||||||
reportClose(regionInfo);
|
reportClose(regionInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -299,7 +299,8 @@ public class HStore implements HConstants {
|
||||||
* reflected in the MapFiles.)
|
* reflected in the MapFiles.)
|
||||||
*/
|
*/
|
||||||
private void doReconstructionLog(final Path reconstructionLog,
|
private void doReconstructionLog(final Path reconstructionLog,
|
||||||
final long maxSeqID) throws UnsupportedEncodingException, IOException {
|
final long maxSeqID)
|
||||||
|
throws UnsupportedEncodingException, IOException {
|
||||||
|
|
||||||
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
|
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
|
||||||
// Nothing to do.
|
// Nothing to do.
|
||||||
|
@ -316,16 +317,13 @@ public class HStore implements HConstants {
|
||||||
HLogKey key = new HLogKey();
|
HLogKey key = new HLogKey();
|
||||||
HLogEdit val = new HLogEdit();
|
HLogEdit val = new HLogEdit();
|
||||||
long skippedEdits = 0;
|
long skippedEdits = 0;
|
||||||
|
long editsCount = 0;
|
||||||
while (logReader.next(key, val)) {
|
while (logReader.next(key, val)) {
|
||||||
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
|
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
|
||||||
if (key.getLogSeqNum() <= maxSeqID) {
|
if (key.getLogSeqNum() <= maxSeqID) {
|
||||||
skippedEdits++;
|
skippedEdits++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (skippedEdits > 0 && LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipped " + skippedEdits +
|
|
||||||
" edits because sequence id <= " + maxSeqID);
|
|
||||||
}
|
|
||||||
// Check this edit is for me. Also, guard against writing
|
// Check this edit is for me. Also, guard against writing
|
||||||
// METACOLUMN info such as HBASE::CACHEFLUSH entries
|
// METACOLUMN info such as HBASE::CACHEFLUSH entries
|
||||||
Text column = val.getColumn();
|
Text column = val.getColumn();
|
||||||
|
@ -335,11 +333,12 @@ public class HStore implements HConstants {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
|
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Applying edit <" + k.toString() + "=" + val.toString() +
|
|
||||||
">");
|
|
||||||
}
|
|
||||||
reconstructedCache.put(k, val.getVal());
|
reconstructedCache.put(k, val.getVal());
|
||||||
|
editsCount++;
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
|
||||||
|
" because sequence id <= " + maxSeqID);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
logReader.close();
|
logReader.close();
|
||||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sleeper for current thread.
|
* Sleeper for current thread.
|
||||||
* Sleeps for passed period. Also checks passed boolean and if interrupted,
|
* Sleeps for passed period. Also checks passed boolean and if interrupted,
|
||||||
|
@ -28,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
* sleep time is up).
|
* sleep time is up).
|
||||||
*/
|
*/
|
||||||
public class Sleeper {
|
public class Sleeper {
|
||||||
|
private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||||
private final int period;
|
private final int period;
|
||||||
private AtomicBoolean stop;
|
private AtomicBoolean stop;
|
||||||
|
|
||||||
|
@ -56,10 +60,19 @@ public class Sleeper {
|
||||||
if (this.stop.get()) {
|
if (this.stop.get()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
long waitTime = this.period - (System.currentTimeMillis() - startTime);
|
long now = System.currentTimeMillis();
|
||||||
|
long waitTime = this.period - (now - startTime);
|
||||||
|
if (waitTime > this.period) {
|
||||||
|
LOG.warn("Calculated wait time > " + this.period +
|
||||||
|
"; setting to this.period: " + System.currentTimeMillis() + ", " +
|
||||||
|
startTime);
|
||||||
|
}
|
||||||
if (waitTime > 0) {
|
if (waitTime > 0) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(waitTime);
|
Thread.sleep(waitTime);
|
||||||
|
if ((System.currentTimeMillis() - now) > (10 * this.period)) {
|
||||||
|
LOG.warn("We slept ten times longer than scheduled: " + this.period);
|
||||||
|
}
|
||||||
} catch(InterruptedException iex) {
|
} catch(InterruptedException iex) {
|
||||||
// We we interrupted because we're meant to stop? If not, just
|
// We we interrupted because we're meant to stop? If not, just
|
||||||
// continue ignoring the interruption
|
// continue ignoring the interruption
|
||||||
|
|
Loading…
Reference in New Issue