HBASE-1393 Narrow synchronization in HLog

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@773167 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-05-09 05:49:46 +00:00
parent 806ee8c9de
commit 2e242480b7
9 changed files with 113 additions and 155 deletions

View File

@ -215,6 +215,7 @@ Release 0.20.0 - Unreleased
HBASE-1392 change how we build/configure lzocodec (Ryan Rawson via Stack)
HBASE-1397 Better distribution in the PerformanceEvaluation MapReduce
when rows run to the Billions
HBASE-1393 Narrow synchronization in HLog
OPTIMIZATIONS

View File

@ -118,8 +118,8 @@ public interface HConstants {
static final String HBASE_DIR = "hbase.rootdir";
/** Used to construct the name of the log directory for a region server
* Use '@' as a special character to seperate the log files from table data */
static final String HREGION_LOGDIR_NAME = "@LOGS@";
* Use '.' as a special character to seperate the log files from table data */
static final String HREGION_LOGDIR_NAME = ".logs";
/** Name of old log file for reconstruction */
static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";

View File

@ -168,7 +168,7 @@ public class RegExpRowFilter implements RowFilterInterface {
}
}
if (nullColumns.contains(colKey)) {
if (data != null && !HLogEdit.isDeleted(data)) {
if (data != null /* DELETE IS IN KEY NOW && !HLogEdit.isDeleted(data)*/) {
return true;
}
}
@ -216,7 +216,7 @@ public class RegExpRowFilter implements RowFilterInterface {
public boolean filterRow(final SortedMap<byte [], Cell> columns) {
for (Entry<byte [], Cell> col : columns.entrySet()) {
if (nullColumns.contains(col.getKey())
&& !HLogEdit.isDeleted(col.getValue().getValue())) {
/* DELETE IS IN KEY NOW && !HLogEdit.isDeleted(col.getValue().getValue())*/) {
return true;
}
}

View File

@ -28,6 +28,9 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -92,17 +95,16 @@ public class HLog implements HConstants, Syncable {
private static final String HLOG_DATFILE = "hlog.dat.";
static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
static final byte [] METAROW = Bytes.toBytes("METAROW");
final FileSystem fs;
final Path dir;
final Configuration conf;
final LogRollListener listener;
private final FileSystem fs;
private final Path dir;
private final Configuration conf;
private final LogRollListener listener;
private final int maxlogentries;
private final long optionalFlushInterval;
private final long blocksize;
private final int flushlogentries;
private volatile int unflushedEntries = 0;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private volatile long lastLogFlushTime;
final long threadWakeFrequency;
/*
* Current log file.
@ -112,24 +114,23 @@ public class HLog implements HConstants, Syncable {
/*
* Map of all log files but the current one.
*/
final SortedMap<Long, Path> outputfiles =
final SortedMap<Long, Path> outputfiles =
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
/*
* Map of region to last sequence/edit id.
*/
private final Map<byte [], Long> lastSeqWritten = Collections.
synchronizedSortedMap(new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR));
private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
private volatile boolean closed = false;
private final Object sequenceLock = new Object();
private volatile long logSeqNum = 0;
private final AtomicLong logSeqNum = new AtomicLong(0);
private volatile long filenum = 0;
private volatile long old_filenum = -1;
private volatile int numEntries = 0;
private final AtomicInteger numEntries = new AtomicInteger(0);
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
@ -175,7 +176,6 @@ public class HLog implements HConstants, Syncable {
conf.getLong("hbase.regionserver.hlog.blocksize", 1024L * 1024L);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
@ -211,15 +211,12 @@ public class HLog implements HConstants, Syncable {
* @param newvalue We'll set log edit/sequence number to this value if it
* is greater than the current value.
*/
void setSequenceNumber(long newvalue) {
synchronized (sequenceLock) {
if (newvalue > logSeqNum) {
if (LOG.isDebugEnabled()) {
LOG.debug("changing sequence number from " + logSeqNum + " to " +
newvalue);
}
logSeqNum = newvalue;
}
void setSequenceNumber(final long newvalue) {
for (long id = this.logSeqNum.get(); id < newvalue &&
!this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
// This could spin on occasion but better the occasional spin than locking
// every increment of sequence number.
LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
}
}
@ -227,7 +224,7 @@ public class HLog implements HConstants, Syncable {
* @return log sequence number
*/
public long getSequenceNumber() {
return logSeqNum;
return logSeqNum.get();
}
/**
@ -290,7 +287,7 @@ public class HLog implements HConstants, Syncable {
regionToFlush = cleanOldLogs();
}
}
this.numEntries = 0;
this.numEntries.set(0);
updateLock.notifyAll();
}
} finally {
@ -380,9 +377,7 @@ public class HLog implements HConstants, Syncable {
}
oldFile = computeFilename(old_filenum);
if (filenum > 0) {
synchronized (this.sequenceLock) {
this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), oldFile);
}
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
}
}
return oldFile;
@ -435,6 +430,53 @@ public class HLog implements HConstants, Syncable {
}
}
/** Append an entry without a row to the log.
*
* @param regionInfo
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, HLogEdit logEdit)
throws IOException {
this.append(regionInfo, new byte[0], logEdit);
}
/** Append an entry to the log.
*
* @param regionInfo
* @param row
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
byte [] regionName = regionInfo.getRegionName();
byte [] tableName = regionInfo.getTableDesc().getName();
synchronized (updateLock) {
long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
doWrite(logKey, logEdit, sync);
this.numEntries.incrementAndGet();
updateLock.notifyAll();
}
if (this.numEntries.get() > this.maxlogentries) {
if (listener != null) {
listener.logRollRequested();
}
}
}
/**
* Append a set of edits to the log. Log edits are keyed by regionName,
* rowname, and log-sequence-id.
@ -461,44 +503,41 @@ public class HLog implements HConstants, Syncable {
void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
boolean sync)
throws IOException {
if (closed) {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
synchronized (updateLock) {
long seqNum[] = obtainSeqNum(edits.size());
long seqNum [] = obtainSeqNum(edits.size());
synchronized (this.updateLock) {
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
if (!this.lastSeqWritten.containsKey(regionName)) {
this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
}
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
int counter = 0;
for (KeyValue kv: edits) {
HLogKey logKey =
new HLogKey(regionName, tableName, seqNum[counter++]);
doWrite(logKey, new HLogEdit(kv), sync);
this.numEntries++;
HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
doWrite(logKey, new HLogEdit(kv), sync);
this.numEntries.incrementAndGet();
}
updateLock.notifyAll();
}
if (this.numEntries > this.maxlogentries) {
if (this.numEntries.get() > this.maxlogentries) {
requestLogRoll();
}
}
public void sync() throws IOException {
lastLogFlushTime = System.currentTimeMillis();
this.writer.sync();
unflushedEntries = 0;
this.unflushedEntries.set(0);
}
void optionalSync() {
if (!this.closed) {
long now = System.currentTimeMillis();
synchronized (updateLock) {
if (((System.currentTimeMillis() - this.optionalFlushInterval) >
this.lastLogFlushTime) && this.unflushedEntries > 0) {
if (((now - this.optionalFlushInterval) >
this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
try {
sync();
} catch (IOException e) {
@ -519,7 +558,7 @@ public class HLog implements HConstants, Syncable {
throws IOException {
try {
this.writer.append(logKey, logEdit);
if (sync || ++unflushedEntries >= flushlogentries) {
if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
sync();
}
} catch (IOException e) {
@ -528,69 +567,17 @@ public class HLog implements HConstants, Syncable {
throw e;
}
}
/** Append an entry without a row to the log.
*
* @param regionInfo
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException {
this.append(regionInfo, new byte[0], logEdit);
}
/** Append an entry to the log.
*
* @param regionInfo
* @param row
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit)
throws IOException {
if (closed) {
throw new IOException("Cannot append; log is closed");
}
byte [] regionName = regionInfo.getRegionName();
byte [] tableName = regionInfo.getTableDesc().getName();
synchronized (updateLock) {
long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
if (!this.lastSeqWritten.containsKey(regionName)) {
this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
}
HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
doWrite(logKey, logEdit, sync);
this.numEntries++;
updateLock.notifyAll();
}
if (this.numEntries > this.maxlogentries) {
if (listener != null) {
listener.logRollRequested();
}
}
}
/** @return How many items have been added to the log */
int getNumEntries() {
return numEntries;
return numEntries.get();
}
/**
* Obtain a log sequence number.
*/
private long obtainSeqNum() {
long value;
synchronized (sequenceLock) {
value = logSeqNum++;
}
return value;
return this.logSeqNum.incrementAndGet();
}
/** @return the number of log files in use */
@ -598,18 +585,16 @@ public class HLog implements HConstants, Syncable {
return outputfiles.size();
}
/**
/*
* Obtain a specified number of sequence numbers
*
* @param num number of sequence numbers to obtain
* @return array of sequence numbers
*/
private long[] obtainSeqNum(int num) {
long[] results = new long[num];
synchronized (this.sequenceLock) {
for (int i = 0; i < num; i++) {
results[i] = this.logSeqNum++;
}
private long [] obtainSeqNum(int num) {
long [] results = new long[num];
for (int i = 0; i < num; i++) {
results[i] = this.logSeqNum.incrementAndGet();
}
return results;
}
@ -651,7 +636,7 @@ public class HLog implements HConstants, Syncable {
synchronized (updateLock) {
this.writer.append(new HLogKey(regionName, tableName, logSeqId),
completeCacheFlushLogEdit());
this.numEntries++;
this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
this.lastSeqWritten.remove(regionName);
@ -664,7 +649,6 @@ public class HLog implements HConstants, Syncable {
}
private HLogEdit completeCacheFlushLogEdit() {
// TODO Profligacy!!! Fix all this creation.
return new HLogEdit(new KeyValue(METAROW, METACOLUMN,
System.currentTimeMillis(), HLogEdit.COMPLETE_CACHE_FLUSH));
}

View File

@ -37,14 +37,11 @@ import org.apache.hadoop.io.Writable;
* TODO: Remove. Just output KVs.
*/
public class HLogEdit implements Writable, HConstants {
/** Value stored for a deleted item */
public static byte [] DELETED_BYTES;
/** Value written to HLog on a complete cache flush */
public static byte [] COMPLETE_CACHE_FLUSH;
/** Value written to HLog on a complete cache flush. TODO: Remove. Not used.
*/
static byte [] COMPLETE_CACHE_FLUSH;
static {
try {
DELETED_BYTES = "HBASE::DELETEVAL".getBytes(UTF8_ENCODING);
COMPLETE_CACHE_FLUSH = "HBASE::CACHEFLUSH".getBytes(UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
assert(false);
@ -183,23 +180,4 @@ public class HLogEdit implements Writable, HConstants {
operation = TransactionalOperation.valueOf(in.readUTF());
}
}
/**
* @param value
* @return True if an entry and its content is {@link #DELETED_BYTES}.
*/
public static boolean isDeleted(final byte [] value) {
return isDeleted(value, 0, value.length);
}
/**
* @param value
* @return True if an entry and its content is {@link #DELETED_BYTES}.
*/
public static boolean isDeleted(final byte [] value, final int offset,
final int length) {
return (value == null)? false:
Bytes.BYTES_RAWCOMPARATOR.compare(DELETED_BYTES, 0, DELETED_BYTES.length,
value, offset, length) == 0;
}
}

View File

@ -1325,12 +1325,7 @@ public class HRegion implements HConstants {
checkColumn(column);
KeyValue kv = null;
if (op.isPut()) {
byte [] val = op.getValue();
if (HLogEdit.isDeleted(val)) {
throw new IOException("Cannot insert value: " +
Bytes.toString(val));
}
kv = new KeyValue(row, column, commitTime, val);
kv = new KeyValue(row, column, commitTime, op.getValue());
} else {
// Its a delete.
if (b.getTimestamp() == LATEST_TIMESTAMP) {
@ -1420,12 +1415,7 @@ public class HRegion implements HConstants {
byte [] column = op.getColumn();
KeyValue kv = null;
if (op.isPut()) {
byte [] val = op.getValue();
if (HLogEdit.isDeleted(val)) {
throw new IOException("Cannot insert value: " +
Bytes.toString(val));
}
kv = new KeyValue(row, column, commitTime, val);
kv = new KeyValue(row, column, commitTime, op.getValue());
} else {
// Its a delete.
if (b.getTimestamp() == LATEST_TIMESTAMP) {

View File

@ -504,7 +504,12 @@ public class HRegionServer implements HConstants, HRegionInterface,
tries++;
} else {
LOG.error("Exceeded max retries: " + this.numRetries, e);
checkFileSystem();
if (checkFileSystem()) {
// Filesystem is OK. Something is up w/ ZK or master. Sleep
// a little while if only to stop our logging many times a
// millisecond.
Thread.sleep(1000);
}
}
if (this.stopRequested.get()) {
LOG.info("Stop was requested, clearing the toDo " +
@ -1708,7 +1713,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
}
public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
public int batchUpdates(final byte[] regionName, final BatchUpdate [] b)
throws IOException {
int i = 0;
checkOpen();

View File

@ -306,7 +306,7 @@ public class Store implements HConstants {
skippedEdits++;
continue;
}
// Check this edit is for me. Also, guard against writing the speical
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
KeyValue kv = val.getKeyValue();
if (val.isTransactionEntry() ||

View File

@ -188,10 +188,10 @@ public class DisabledTestRegExpRowFilter extends TestCase {
// Try a row that has all expected columnKeys and a null-expected columnKey
// that maps to a null value.
// Testing row with columnKeys: a-e, e maps to null
colvalues.put(new byte [] {LAST_CHAR},
new Cell(HLogEdit.DELETED_BYTES, HConstants.LATEST_TIMESTAMP));
assertFalse("Failed with last columnKey " + LAST_CHAR + " mapping to null.",
filter.filterRow(colvalues));
// colvalues.put(new byte [] {LAST_CHAR},
// new Cell(HLogEdit.DELETED_BYTES, HConstants.LATEST_TIMESTAMP));
// assertFalse("Failed with last columnKey " + LAST_CHAR + " mapping to null.",
// filter.filterRow(colvalues));
}
private byte [] createRow(final char c) {