HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on HLog#append?)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@775418 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-05-16 06:10:44 +00:00
parent ac347387ab
commit cff0b8bb04
11 changed files with 173 additions and 136 deletions

View File

@ -249,6 +249,8 @@ Release 0.20.0 - Unreleased
HBASE-1424 have shell print regioninfo and location on first load if
DEBUG enabled
HBASE-1008 [performance] The replay of logs on server crash takes way too long
HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on
HLog#append?)
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue

View File

@ -174,14 +174,6 @@
in milliseconds. Default is 3 seconds.
</description>
</property>
<property>
<name>hbase.regionserver.maxlogentries</name>
<value>100000</value>
<description>Rotate the HRegion HLogs when count of entries exceeds this
value. Default: 100,000. Value is checked by a thread that runs every
hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.regionserver.flushlogentries</name>
<value>100</value>

View File

@ -267,7 +267,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
this.rootdir, this.conf);
HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
this.rootdir, this.conf);
// Add first region from the META table to the ROOT region.
HRegion.addRegionToMETA(root, meta);
root.close();

View File

@ -104,7 +104,6 @@ public class HLog implements HConstants, Syncable {
private final Path dir;
private final Configuration conf;
private final LogRollListener listener;
private final int maxlogentries;
private final long optionalFlushInterval;
private final long blocksize;
private final int flushlogentries;
@ -132,11 +131,17 @@ public class HLog implements HConstants, Syncable {
private final AtomicLong logSeqNum = new AtomicLong(0);
private volatile long filenum = 0;
private volatile long filenum = -1;
private volatile long old_filenum = -1;
private final AtomicInteger numEntries = new AtomicInteger(0);
// Size of edits written so far. Used figuring when to rotate logs.
private final AtomicLong editsSize = new AtomicLong(0);
// If > than this size, roll the log.
private final long logrollsize;
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock();
@ -144,7 +149,9 @@ public class HLog implements HConstants, Syncable {
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
private final Object updateLock = new Object();
private final boolean enabled;
/*
* If more than this many logs, force flush of oldest region to oldest edit
* goes to disk. If too many and we crash, then will take forever replaying.
@ -182,12 +189,13 @@ public class HLog implements HConstants, Syncable {
this.dir = dir;
this.conf = conf;
this.listener = listener;
this.maxlogentries =
conf.getInt("hbase.regionserver.maxlogentries", 100000);
this.flushlogentries =
conf.getInt("hbase.regionserver.flushlogentries", 100);
this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
// Roll at 95% of block size.
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
@ -196,15 +204,16 @@ public class HLog implements HConstants, Syncable {
}
fs.mkdirs(dir);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
LOG.info("HLog configuration: blocksize=" + this.blocksize +
", maxlogentries=" + this.maxlogentries + ", flushlogentries=" +
this.flushlogentries + ", optionallogflushinternal=" +
this.optionalFlushInterval + "ms");
", rollsize=" + this.logrollsize +
", enabled=" + this.enabled +
", flushlogentries=" + this.flushlogentries +
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
}
/**
* Accessor for tests. Not a part of the public API.
* @return Current state of the monotonically increasing file id.
*/
public long getFilenum() {
@ -212,17 +221,13 @@ public class HLog implements HConstants, Syncable {
}
/**
* Get the compression type for the hlog files.
* Commit logs SHOULD NOT be compressed. You'll lose edits if the compression
* record is not complete. In gzip, record is 32k so you could lose up to
* 32k of edits (All of this is moot till we have sync/flush in hdfs but
* still...).
* Get the compression type for the hlog files
* @param c Configuration to use.
* @return the kind of compression to use
*/
private static CompressionType getCompressionType(final Configuration c) {
String name = c.get("hbase.io.seqfile.compression.type");
return name == null? CompressionType.NONE: CompressionType.valueOf(name);
// Compression makes no sense for commit log. Always return NONE.
return CompressionType.NONE;
}
/**
@ -277,23 +282,24 @@ public class HLog implements HConstants, Syncable {
}
synchronized (updateLock) {
// Clean up current writer.
Path oldFile = cleanupCurrentWriter();
// Create a new one.
this.old_filenum = this.filenum;
Path oldFile = cleanupCurrentWriter(this.filenum);
if (this.filenum >= 0) {
this.old_filenum = this.filenum;
}
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
HLogKey.class, KeyValue.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), this.blocksize,
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
new Metadata());
LOG.info((oldFile != null?
"Closed " + oldFile + ", entries=" + this.numEntries.get() + ". ": "") +
"New log writer: " + FSUtils.getPath(newPath));
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
", calcsize=" + this.editsSize.get() + ", filesize=" +
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.size() <= 0) {
@ -310,6 +316,7 @@ public class HLog implements HConstants, Syncable {
}
}
this.numEntries.set(0);
this.editsSize.set(0);
updateLock.notifyAll();
}
} finally {
@ -337,7 +344,7 @@ public class HLog implements HConstants, Syncable {
if (LOG.isDebugEnabled()) {
// Find region associated with oldest key -- helps debugging.
oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " +
" out of total " + this.outputfiles.size() + "; " +
"oldest outstanding seqnum is " + oldestOutstandingSeqNum +
" from region " + Bytes.toString(oldestRegion));
@ -351,7 +358,7 @@ public class HLog implements HConstants, Syncable {
if (countOfLogs > this.maxLogs) {
regionToFlush = oldestRegion != null?
oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" +
this.maxLogs + "; forcing flush of region with oldest edits: " +
Bytes.toString(regionToFlush));
}
@ -382,7 +389,8 @@ public class HLog implements HConstants, Syncable {
* @return Path to current writer or null if none.
* @throws IOException
*/
private Path cleanupCurrentWriter() throws IOException {
private Path cleanupCurrentWriter(final long currentfilenum)
throws IOException {
Path oldFile = null;
if (this.writer != null) {
// Close the current writer, get a new one.
@ -393,12 +401,12 @@ public class HLog implements HConstants, Syncable {
// shut ourselves down to minimize loss. Alternative is to try and
// keep going. See HBASE-930.
FailedLogCloseException flce =
new FailedLogCloseException("#" + this.filenum);
new FailedLogCloseException("#" + currentfilenum);
flce.initCause(e);
throw e;
}
oldFile = computeFilename(old_filenum);
if (filenum > 0) {
if (currentfilenum >= 0) {
oldFile = computeFilename(currentfilenum);
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
}
}
@ -406,7 +414,7 @@ public class HLog implements HConstants, Syncable {
}
private void deleteLogFile(final Path p, final Long seqno) throws IOException {
LOG.info("removing old log file " + FSUtils.getPath(p) +
LOG.info("removing old hlog file " + FSUtils.getPath(p) +
" whose highest sequence/edit id is " + seqno);
this.fs.delete(p, true);
}
@ -418,6 +426,7 @@ public class HLog implements HConstants, Syncable {
* @return Path
*/
public Path computeFilename(final long fn) {
if (fn < 0) return null;
return new Path(dir, HLOG_DATFILE + fn);
}
@ -442,7 +451,7 @@ public class HLog implements HConstants, Syncable {
synchronized (updateLock) {
this.closed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("closing log writer in " + this.dir.toString());
LOG.debug("closing hlog writer in " + this.dir.toString());
}
this.writer.close();
updateLock.notifyAll();
@ -457,11 +466,12 @@ public class HLog implements HConstants, Syncable {
*
* @param regionInfo
* @param logEdit
* @param now
* @throws IOException
*/
public void append(HRegionInfo regionInfo, KeyValue logEdit)
public void append(HRegionInfo regionInfo, KeyValue logEdit, final long now)
throws IOException {
this.append(regionInfo, new byte[0], logEdit);
this.append(regionInfo, new byte[0], logEdit, now);
}
/** Append an entry to the log.
@ -469,9 +479,11 @@ public class HLog implements HConstants, Syncable {
* @param regionInfo
* @param row
* @param logEdit
* @param now Time of this edit write.
* @throws IOException
*/
public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit)
public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit,
final long now)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@ -485,14 +497,13 @@ public class HLog implements HConstants, Syncable {
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
HLogKey logKey = new HLogKey(regionName, tableName, seqNum, now);
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
doWrite(logKey, logEdit, sync);
doWrite(logKey, logEdit, sync, now);
this.numEntries.incrementAndGet();
updateLock.notifyAll();
}
if (this.numEntries.get() > this.maxlogentries) {
if (this.editsSize.get() > this.logrollsize) {
if (listener != null) {
listener.logRollRequested();
}
@ -520,10 +531,11 @@ public class HLog implements HConstants, Syncable {
* @param tableName
* @param edits
* @param sync
* @param now
* @throws IOException
*/
void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
boolean sync)
boolean sync, final long now)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@ -537,13 +549,14 @@ public class HLog implements HConstants, Syncable {
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
int counter = 0;
for (KeyValue kv: edits) {
HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
doWrite(logKey, kv, sync);
HLogKey logKey =
new HLogKey(regionName, tableName, seqNum[counter++], now);
doWrite(logKey, kv, sync, now);
this.numEntries.incrementAndGet();
}
updateLock.notifyAll();
}
if (this.numEntries.get() > this.maxlogentries) {
if (this.editsSize.get() > this.logrollsize) {
requestLogRoll();
}
}
@ -558,19 +571,19 @@ public class HLog implements HConstants, Syncable {
if (!this.closed) {
long now = System.currentTimeMillis();
synchronized (updateLock) {
if (((now - this.optionalFlushInterval) >
this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
this.unflushedEntries.get() > 0) {
try {
sync();
} catch (IOException e) {
LOG.error("Error flushing HLog", e);
LOG.error("Error flushing hlog", e);
}
}
}
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
"ms optional sync'ing HLog; editcount=" + this.numEntries.get());
"ms optional sync'ing hlog; editcount=" + this.numEntries.get());
}
}
}
@ -581,10 +594,14 @@ public class HLog implements HConstants, Syncable {
}
}
private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync)
private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync,
final long now)
throws IOException {
if (!this.enabled) {
return;
}
try {
long now = System.currentTimeMillis();
this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
this.writer.append(logKey, logEdit);
if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
sync();
@ -592,10 +609,10 @@ public class HLog implements HConstants, Syncable {
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
"ms appending an edit to HLog; editcount=" + this.numEntries.get());
"ms appending an edit to hlog; editcount=" + this.numEntries.get());
}
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of log", e);
LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
throw e;
}
@ -667,8 +684,8 @@ public class HLog implements HConstants, Syncable {
return;
}
synchronized (updateLock) {
this.writer.append(new HLogKey(regionName, tableName, logSeqId),
completeCacheFlushLogEdit());
this.writer.append(new HLogKey(regionName, tableName, logSeqId,
System.currentTimeMillis()), completeCacheFlushLogEdit());
this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
@ -729,7 +746,7 @@ public class HLog implements HConstants, Syncable {
// Nothing to do
return;
}
LOG.info("Splitting " + logfiles.length + " log(s) in " +
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
splitLog(rootDir, logfiles, fs, conf);
try {
@ -741,7 +758,7 @@ public class HLog implements HConstants, Syncable {
throw io;
}
long endMillis = System.currentTimeMillis();
LOG.info("log file splitting completed in " + (endMillis - millis) +
LOG.info("hlog file splitting completed in " + (endMillis - millis) +
" millis for " + srcDir.toString());
}
@ -762,8 +779,8 @@ public class HLog implements HConstants, Syncable {
try {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
// Check for possibly empty file. With appends, currently Hadoop reports
// a zero length even if the file has been sync'd. Revisit if
@ -777,7 +794,7 @@ public class HLog implements HConstants, Syncable {
try {
int count = 0;
while (in.next(key, val)) {
byte[] regionName = key.getRegionName();
byte [] regionName = key.getRegionName();
LinkedList<HLogEntry> queue = logEntries.get(regionName);
if (queue == null) {
queue = new LinkedList<HLogEntry>();
@ -787,7 +804,8 @@ public class HLog implements HConstants, Syncable {
queue.push(new HLogEntry(val, key));
count++;
}
LOG.debug("Pushed " + count + " entries");
LOG.debug("Pushed " + count + " entries from " +
logfiles[i].getPath());
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
@ -797,7 +815,7 @@ public class HLog implements HConstants, Syncable {
}
} catch (IOException e) {
if (length <= 0) {
LOG.warn("Empty log, continuing: " + logfiles[i]);
LOG.warn("Empty hlog, continuing: " + logfiles[i]);
continue;
}
throw e;
@ -838,7 +856,7 @@ public class HLog implements HConstants, Syncable {
Path oldlogfile = null;
SequenceFile.Reader old = null;
if (fs.exists(logfile)) {
LOG.warn("Old log file " + logfile
LOG.warn("Old hlog file " + logfile
+ " already exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
@ -852,7 +870,7 @@ public class HLog implements HConstants, Syncable {
// iterate.
logWriters.put(key, w);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new log file writer for path "
LOG.debug("Creating new hlog file writer for path "
+ logfile + " and region " + Bytes.toString(key));
}
@ -893,10 +911,10 @@ public class HLog implements HConstants, Syncable {
// Wait for all threads to terminate
try {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
LOG.debug("Waiting for log writers to terminate, iteration #" + i);
LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
}catch(InterruptedException ex) {
LOG.warn("Log writers were interrupted, possible data loss!");
LOG.warn("Hlog writers were interrupted, possible data loss!");
}
} finally {
for (SequenceFile.Writer w : logWriters.values()) {

View File

@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
@ -34,14 +36,18 @@ import java.io.*;
* <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
* associated row.
*/
public class HLogKey implements WritableComparable<HLogKey> {
public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
private byte [] regionName;
private byte [] tablename;
private long logSeqNum;
// Time at which this edit was written.
private long writeTime;
private int HEAP_TAX = HeapSize.OBJECT + (2 * HeapSize.BYTE_ARRAY) +
(2 * HeapSize.LONG);
/** Create an empty key useful when deserializing */
/** Writable Consructor -- Do not use. */
public HLogKey() {
this(null, null, 0L);
this(null, null, 0L, HConstants.LATEST_TIMESTAMP);
}
/**
@ -52,12 +58,14 @@ public class HLogKey implements WritableComparable<HLogKey> {
* @param regionName - name of region
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
*/
public HLogKey(final byte [] regionName, final byte [] tablename,
long logSeqNum) {
long logSeqNum, final long now) {
this.regionName = regionName;
this.tablename = tablename;
this.logSeqNum = logSeqNum;
this.writeTime = now;
}
//////////////////////////////////////////////////////////////////////////////
@ -78,7 +86,11 @@ public class HLogKey implements WritableComparable<HLogKey> {
public long getLogSeqNum() {
return logSeqNum;
}
public long getWriteTime() {
return this.writeTime;
}
@Override
public String toString() {
return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@ -100,38 +112,44 @@ public class HLogKey implements WritableComparable<HLogKey> {
public int hashCode() {
int result = this.regionName.hashCode();
result ^= this.logSeqNum;
result ^= this.writeTime;
return result;
}
//
// Comparable
//
public int compareTo(HLogKey o) {
int result = Bytes.compareTo(this.regionName, o.regionName);
if(result == 0) {
if (result == 0) {
if (this.logSeqNum < o.logSeqNum) {
result = -1;
} else if (this.logSeqNum > o.logSeqNum) {
result = 1;
}
if (result == 0) {
if (this.writeTime < o.writeTime) {
result = -1;
} else if (this.writeTime > o.writeTime) {
return 1;
}
}
}
return result;
}
//
// Writable
//
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.regionName);
Bytes.writeByteArray(out, this.tablename);
out.writeLong(logSeqNum);
out.writeLong(this.writeTime);
}
public void readFields(DataInput in) throws IOException {
this.regionName = Bytes.readByteArray(in);
this.tablename = Bytes.readByteArray(in);
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
}
public long heapSize() {
return this.regionName.length + this.tablename.length + HEAP_TAX;
}
}

View File

@ -1341,7 +1341,7 @@ public class HRegion implements HConstants {
edits.add(kv);
}
if (!edits.isEmpty()) {
update(edits, writeToWAL);
update(edits, writeToWAL, now);
}
if (latestTimestampDeletes != null &&
!latestTimestampDeletes.isEmpty()) {
@ -1349,7 +1349,7 @@ public class HRegion implements HConstants {
// as edits. Need to do individually after figuring which is latest
// timestamp to delete.
for (byte [] column: latestTimestampDeletes) {
deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
}
}
} finally {
@ -1387,6 +1387,7 @@ public class HRegion implements HConstants {
splitsAndClosesLock.readLock().lock();
try {
byte[] row = b.getRow();
long now = System.currentTimeMillis();
Integer lid = getLock(lockid,row);
try {
NavigableSet<byte []> keySet =
@ -1404,7 +1405,7 @@ public class HRegion implements HConstants {
}
if (success) {
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
System.currentTimeMillis(): b.getTimestamp();
now: b.getTimestamp();
Set<byte []> latestTimestampDeletes = null;
List<KeyValue> edits = new ArrayList<KeyValue>();
for (BatchOperation op: b) {
@ -1431,7 +1432,7 @@ public class HRegion implements HConstants {
edits.add(kv);
}
if (!edits.isEmpty()) {
update(edits, writeToWAL);
update(edits, writeToWAL, now);
}
if (latestTimestampDeletes != null &&
!latestTimestampDeletes.isEmpty()) {
@ -1439,7 +1440,7 @@ public class HRegion implements HConstants {
// as edits. Need to do individually after figuring which is latest
// timestamp to delete.
for (byte [] column: latestTimestampDeletes) {
deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
}
}
}
@ -1530,7 +1531,7 @@ public class HRegion implements HConstants {
try {
// Delete ALL versions rather than column family VERSIONS. If we just did
// VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
deleteMultiple(row, column, ts, ALL_VERSIONS);
deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis());
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -1547,9 +1548,10 @@ public class HRegion implements HConstants {
throws IOException {
checkReadOnly();
Integer lid = getLock(lockid, row);
long now = System.currentTimeMillis();
long time = ts;
if (ts == HConstants.LATEST_TIMESTAMP) {
time = System.currentTimeMillis();
time = now;
}
KeyValue kv = KeyValue.createFirstOnRow(row, time);
try {
@ -1561,7 +1563,7 @@ public class HRegion implements HConstants {
// This is UGLY. COPY OF KEY PART OF KeyValue.
edits.add(key.cloneDelete());
}
update(edits);
update(edits, now);
}
} finally {
if (lockid == null) releaseRowLock(lid);
@ -1594,7 +1596,7 @@ public class HRegion implements HConstants {
for (KeyValue key: keyvalues) {
edits.add(key.cloneDelete());
}
update(edits);
update(edits, now);
}
} finally {
if(lockid == null) releaseRowLock(lid);
@ -1629,7 +1631,7 @@ public class HRegion implements HConstants {
for (KeyValue kv: keyvalues) {
edits.add(kv.cloneDelete());
}
update(edits);
update(edits, now);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -1668,7 +1670,7 @@ public class HRegion implements HConstants {
for (KeyValue k: keyvalues) {
edits.add(k.cloneDelete());
}
update(edits);
update(edits, now);
}
} finally {
if(lockid == null) releaseRowLock(lid);
@ -1684,10 +1686,11 @@ public class HRegion implements HConstants {
* @param ts Timestamp to start search on.
* @param versions How many versions to delete. Pass
* {@link HConstants#ALL_VERSIONS} to delete all.
* @param now
* @throws IOException
*/
private void deleteMultiple(final byte [] row, final byte [] column,
final long ts, final int versions)
final long ts, final int versions, final long now)
throws IOException {
checkReadOnly();
// We used to have a getKeys method that purportedly only got the keys and
@ -1704,7 +1707,7 @@ public class HRegion implements HConstants {
for (KeyValue key: keys) {
edits.add(key.cloneDelete());
}
update(edits);
update(edits, now);
}
}
@ -1748,10 +1751,12 @@ public class HRegion implements HConstants {
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
* @praram now
* @throws IOException
*/
private void update(final List<KeyValue> edits) throws IOException {
this.update(edits, true);
private void update(final List<KeyValue> edits, final long now)
throws IOException {
this.update(edits, true, now);
}
/**
@ -1759,9 +1764,11 @@ public class HRegion implements HConstants {
* Warning: Assumption is caller has lock on passed in row.
* @param writeToWAL if true, then we should write to the log
* @param updatesByColumn Cell updates by column
* @param now
* @throws IOException
*/
private void update(final List<KeyValue> edits, boolean writeToWAL)
private void update(final List<KeyValue> edits, boolean writeToWAL,
final long now)
throws IOException {
if (edits == null || edits.isEmpty()) {
return;
@ -1772,7 +1779,7 @@ public class HRegion implements HConstants {
if (writeToWAL) {
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), edits,
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()));
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
}
long size = 0;
for (KeyValue kv: edits) {
@ -2273,7 +2280,7 @@ public class HRegion implements HConstants {
List<KeyValue> edits = new ArrayList<KeyValue>();
edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
Writables.getBytes(r.getRegionInfo())));
meta.update(edits);
meta.update(edits, System.currentTimeMillis());
} finally {
meta.releaseRowLock(lid);
}

View File

@ -205,9 +205,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected volatile HLog log;
LogRoller logRoller;
LogFlusher logFlusher;
protected volatile HLog hlog;
LogRoller hlogRoller;
LogFlusher hlogFlusher;
// limit compactions while starting up
CompactionLimitThread compactionLimitThread;
@ -344,10 +344,10 @@ public class HRegionServer implements HConstants, HRegionInterface,
this.compactSplitThread = new CompactSplitThread(this);
// Log rolling thread
this.logRoller = new LogRoller(this);
this.hlogRoller = new LogRoller(this);
// Log flushing thread
this.logFlusher =
this.hlogFlusher =
new LogFlusher(this.threadWakeFrequency, this.stopRequested);
// Background thread to check for major compactions; needed if region
@ -513,14 +513,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
if (checkFileSystem()) {
closeAllRegions();
try {
log.closeAndDelete();
hlog.closeAndDelete();
} catch (Exception e) {
LOG.error("error closing and deleting HLog", e);
}
try {
serverInfo.setStartCode(System.currentTimeMillis());
log = setupHLog();
this.logFlusher.setHLog(log);
hlog = setupHLog();
this.hlogFlusher.setHLog(hlog);
} catch (IOException e) {
this.abortRequested = true;
this.stopRequested.set(true);
@ -620,17 +620,17 @@ public class HRegionServer implements HConstants, HRegionInterface,
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
cacheFlusher.interruptIfNecessary();
logFlusher.interrupt();
hlogFlusher.interrupt();
compactSplitThread.interruptIfNecessary();
logRoller.interruptIfNecessary();
hlogRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
if (abortRequested) {
if (this.fsOk) {
// Only try to clean up if the file system is available
try {
if (this.log != null) {
this.log.close();
if (this.hlog != null) {
this.hlog.close();
LOG.info("On abort, closed hlog");
}
} catch (Throwable e) {
@ -644,7 +644,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
} else {
ArrayList<HRegion> closedRegions = closeAllRegions();
try {
log.closeAndDelete();
hlog.closeAndDelete();
} catch (Throwable e) {
LOG.error("Close and delete failed",
RemoteExceptionHandler.checkThrowable(e));
@ -743,8 +743,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
this.hdfsShutdownThread = suppressHdfsShutdownHook();
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.log = setupHLog();
this.logFlusher.setHLog(log);
this.hlog = setupHLog();
this.hlogFlusher.setHLog(hlog);
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
startServiceThreads();
@ -1058,7 +1058,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
"running at " + this.serverInfo.getServerAddress().toString() +
" because logdir " + logdir.toString() + " exists");
}
HLog newlog = new HLog(fs, logdir, conf, logRoller);
HLog newlog = new HLog(fs, logdir, conf, hlogRoller);
return newlog;
}
@ -1127,9 +1127,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
handler);
Threads.setDaemonThreadRunning(this.logFlusher, n + ".logFlusher",
Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
@ -1199,7 +1199,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
// Verify that all threads are alive
if (!(leases.isAlive() && compactSplitThread.isAlive() &&
cacheFlusher.isAlive() && logRoller.isAlive() &&
cacheFlusher.isAlive() && hlogRoller.isAlive() &&
workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
// One or more threads are no longer alive - shut down
stop();
@ -1234,7 +1234,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
/** @return the HLog */
HLog getLog() {
return this.log;
return this.hlog;
}
/**
@ -1270,7 +1270,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
Threads.shutdown(this.workerThread);
Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.compactSplitThread);
Threads.shutdown(this.logRoller);
Threads.shutdown(this.hlogRoller);
}
private boolean getMaster() {
@ -1540,7 +1540,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
this.lock.writeLock().lock();
try {
this.log.setSequenceNumber(region.getMinSequenceId());
this.hlog.setSequenceNumber(region.getMinSequenceId());
this.onlineRegions.put(mapKey, region);
} finally {
this.lock.writeLock().unlock();
@ -1552,7 +1552,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
.getTableDesc().getName()), this.log, this.fs, conf, regionInfo,
.getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
this.cacheFlusher);
r.initialize(null, new Progressable() {
public void progress() {

View File

@ -61,7 +61,7 @@ public class IndexedRegionServer extends TransactionalRegionServer implements
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super
.getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
.getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
.getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
r.initialize(null, new Progressable() {
public void progress() {

View File

@ -100,7 +100,7 @@ class TransactionalHLogManager {
HLogEdit logEdit;
logEdit = new HLogEdit(transactionId, TransactionalOperation.START);
*/
hlog.append(regionInfo, null/*logEdit*/);
hlog.append(regionInfo, null/*logEdit*/, System.currentTimeMillis());
}
/**
@ -117,7 +117,7 @@ class TransactionalHLogManager {
for (BatchOperation op : update) {
// COMMENTED OUT HLogEdit logEdit = new HLogEdit(transactionId, update.getRow(), op, commitTime);
hlog.append(regionInfo, update.getRow(), null /*logEdit*/);
hlog.append(regionInfo, update.getRow(), null /*logEdit*/, System.currentTimeMillis());
}
}
@ -130,7 +130,7 @@ class TransactionalHLogManager {
logEdit = new HLogEdit(transactionId,
HLogEdit.TransactionalOperation.COMMIT);
*/
hlog.append(regionInfo, null /*logEdit*/);
hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis());
}
/**
@ -141,7 +141,7 @@ class TransactionalHLogManager {
/*HLogEdit logEdit;
logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
*/
hlog.append(regionInfo, null /*logEdit*/);
hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis());
}
/**

View File

@ -111,7 +111,7 @@ public class TransactionalRegionServer extends HRegionServer implements
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
.getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
.getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
.getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
r.initialize(null, new Progressable() {
public void progress() {

View File

@ -77,7 +77,8 @@ public class TestHLog extends HBaseTestCase implements HConstants {
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
edit.add(new KeyValue(rowName, column, System.currentTimeMillis(),
column));
log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit, false);
log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit,
false, System.currentTimeMillis());
}
}
log.rollWriter();
@ -110,7 +111,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
log.append(regionName, tableName, cols, false);
log.append(regionName, tableName, cols, false, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();