HBASE-505 Region assignments should never time out so long as the region
server reports that it is processing the open request This is patch reviewed with Jim but with the number of edits between reports made into a configurable. Have the HRegionServer pass down a Progressable implementation down into Region and then down int Store where edits are replayed. Call progress after every couple of thousand edits. M src/java/org/apache/hadoop/hbase/HStore.java Take a Progessable in the constructor. Call it when applying edits. M src/java/org/apache/hadoop/hbase/HMaster.java Update commment around MSG_REPORT_PROCESS_OPEN so its expected that we can get more than one of these messages during a region open. M src/java/org/apache/hadoop/hbase/HRegion.java New constructor that takes a Progressable. Pass it to Stores on construction. M src/java/org/apache/hadoop/hbase/HRegionServer.java On open of a region, pass in a Progressable that adds a MSG_REPORT_PROCESS_OPEN every time its called. git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@643223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
198a156334
commit
f15048d4d0
|
@ -4,6 +4,8 @@ Hbase Change Log
|
|||
HBASE-550 EOF trying to read reconstruction log stops region deployment
|
||||
HBASE-551 Master stuck splitting server logs in shutdown loop; on each
|
||||
iteration, edits are aggregated up into the millions
|
||||
HBASE-505 Region assignments should never time out so long as the region
|
||||
server reports that it is processing the open request
|
||||
|
||||
Release 0.1.0
|
||||
|
||||
|
|
|
@ -494,10 +494,12 @@ class RegionManager implements HConstants {
|
|||
/** Update the deadline for a region assignment to be completed */
|
||||
public void updateAssignmentDeadline(HRegionInfo info) {
|
||||
synchronized (unassignedRegions) {
|
||||
// Region server has acknowledged request to open region.
|
||||
// Region server is reporting in that its working on region open
|
||||
// (We can get more than one of these messages if region is replaying
|
||||
// a bunch of edits and taking a while to open).
|
||||
// Extend region open time by max region open time.
|
||||
unassignedRegions.put(info,
|
||||
System.currentTimeMillis() + master.maxRegionOpenTime);
|
||||
this.unassignedRegions.put(info,
|
||||
Long.valueOf(System.currentTimeMillis() + this.master.maxRegionOpenTime));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.io.Cell;
|
|||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -373,12 +374,41 @@ public class HRegion implements HConstants {
|
|||
* @param initialFiles If there are initial files (implying that the HRegion
|
||||
* is new), then read them from the supplied path.
|
||||
* @param listener an object that implements CacheFlushListener or null
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
|
||||
HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
|
||||
throws IOException {
|
||||
this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* HRegion constructor.
|
||||
*
|
||||
* @param log The HLog is the outbound log for any updates to the HRegion
|
||||
* (There's a single HLog for all the HRegions on a single HRegionServer.)
|
||||
* The log file is a logfile from the previous execution that's
|
||||
* custom-computed for this HRegion. The HRegionServer computes and sorts the
|
||||
* appropriate log info for this HRegion. If there is a previous log file
|
||||
* (implying that the HRegion has been written-to before), then read it from
|
||||
* the supplied path.
|
||||
* @param basedir qualified path of directory where region should be located,
|
||||
* usually the table directory.
|
||||
* @param fs is the filesystem.
|
||||
* @param conf is global configuration settings.
|
||||
* @param regionInfo - HRegionInfo that describes the region
|
||||
* @param initialFiles If there are initial files (implying that the HRegion
|
||||
* is new), then read them from the supplied path.
|
||||
* @param listener an object that implements CacheFlushListener or null
|
||||
* @param reporter Call on a period so hosting server can report we're
|
||||
* making progress to master -- otherwise master might think region deploy
|
||||
* failed. Can be null.
|
||||
* @throws IOException
|
||||
*/
|
||||
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
|
||||
HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener,
|
||||
final Progressable reporter)
|
||||
throws IOException {
|
||||
|
||||
this.basedir = basedir;
|
||||
this.log = log;
|
||||
|
@ -402,12 +432,9 @@ public class HRegion implements HConstants {
|
|||
long maxSeqId = -1;
|
||||
for(HColumnDescriptor c :
|
||||
this.regionInfo.getTableDesc().families().values()) {
|
||||
|
||||
HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs,
|
||||
oldLogFile, this.conf);
|
||||
|
||||
oldLogFile, this.conf, reporter);
|
||||
stores.put(c.getFamilyName(), store);
|
||||
|
||||
long storeSeqId = store.getMaxSequenceId();
|
||||
if (storeSeqId > maxSeqId) {
|
||||
maxSeqId = storeSeqId;
|
||||
|
|
|
@ -82,6 +82,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
|
@ -785,7 +786,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
HTableDescriptor.getTableDir(rootDir,
|
||||
regionInfo.getTableDesc().getName()
|
||||
),
|
||||
this.log, this.fs, conf, regionInfo, null, this.cacheFlusher
|
||||
this.log, this.fs, conf, regionInfo, null, this.cacheFlusher,
|
||||
new Progressable() {
|
||||
public void progress() {
|
||||
getOutboundMsgs().add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN,
|
||||
regionInfo));
|
||||
}
|
||||
}
|
||||
);
|
||||
// Startup a compaction early if one is needed.
|
||||
this.compactSplitThread.compactionRequested(region);
|
||||
|
@ -1337,6 +1344,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
throw new IOException("Unknown protocol to name node: " + protocol);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Queue to which you can add outbound messages.
|
||||
*/
|
||||
protected List<HMsg> getOutboundMsgs() {
|
||||
return this.outboundMsgs;
|
||||
}
|
||||
|
||||
//
|
||||
// Main program and support routines
|
||||
//
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.io.SequenceFile;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.BloomFilterDescriptor;
|
||||
import org.onelab.filter.BloomFilter;
|
||||
|
@ -159,10 +160,14 @@ public class HStore implements HConstants {
|
|||
* @param fs file system object
|
||||
* @param reconstructionLog existing log file to apply if any
|
||||
* @param conf configuration object
|
||||
* @param reporter Call on a period so hosting server can report we're
|
||||
* making progress to master -- otherwise master might think region deploy
|
||||
* failed. Can be null.
|
||||
* @throws IOException
|
||||
*/
|
||||
HStore(Path basedir, HRegionInfo info, HColumnDescriptor family,
|
||||
FileSystem fs, Path reconstructionLog, HBaseConfiguration conf)
|
||||
FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
|
||||
final Progressable reporter)
|
||||
throws IOException {
|
||||
this.basedir = basedir;
|
||||
this.info = info;
|
||||
|
@ -235,7 +240,7 @@ public class HStore implements HConstants {
|
|||
}
|
||||
|
||||
try {
|
||||
doReconstructionLog(reconstructionLog, maxSeqId);
|
||||
doReconstructionLog(reconstructionLog, maxSeqId, reporter);
|
||||
} catch (IOException e) {
|
||||
// Presume we got here because of some HDFS issue or because of a lack of
|
||||
// HADOOP-1700; for now keep going but this is probably not what we want
|
||||
|
@ -308,7 +313,7 @@ public class HStore implements HConstants {
|
|||
* reflected in the MapFiles.)
|
||||
*/
|
||||
private void doReconstructionLog(final Path reconstructionLog,
|
||||
final long maxSeqID)
|
||||
final long maxSeqID, final Progressable reporter)
|
||||
throws UnsupportedEncodingException, IOException {
|
||||
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
|
||||
// Nothing to do.
|
||||
|
@ -332,6 +337,8 @@ public class HStore implements HConstants {
|
|||
HLogEdit val = new HLogEdit();
|
||||
long skippedEdits = 0;
|
||||
long editsCount = 0;
|
||||
// How many edits to apply before we send a progress report.
|
||||
int reportInterval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
|
||||
while (logReader.next(key, val)) {
|
||||
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
|
||||
if (key.getLogSeqNum() <= maxSeqID) {
|
||||
|
@ -349,6 +356,11 @@ public class HStore implements HConstants {
|
|||
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
|
||||
reconstructedCache.put(k, val.getVal());
|
||||
editsCount++;
|
||||
// Every 2k edits, tell the reporter we're making progress.
|
||||
// Have seen 60k edits taking 3minutes to complete.
|
||||
if (reporter != null && (editsCount % reportInterval) == 0) {
|
||||
reporter.progress();
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
|
||||
|
|
Loading…
Reference in New Issue