HBASE-5826 Improve sync of HLog edits (Todd)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1339671 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-05-17 15:55:59 +00:00
parent c18eebd8a6
commit 022ef75950
3 changed files with 114 additions and 79 deletions

View File

@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.base.Charsets;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -139,8 +140,10 @@ public class HLog implements Syncable {
private final long optionalFlushInterval; private final long optionalFlushInterval;
private final long blocksize; private final long blocksize;
private final String prefix; private final String prefix;
private final AtomicLong unflushedEntries = new AtomicLong(0);
private volatile long syncedTillHere = 0; /** tracking information about what has been synced */
private SyncInfo syncInfo = new SyncInfo();
private long lastDeferredTxid; private long lastDeferredTxid;
private final Path oldLogDir; private final Path oldLogDir;
private volatile boolean logRollRunning; private volatile boolean logRollRunning;
@ -230,7 +233,6 @@ public class HLog implements Syncable {
// during an update // during an update
// locked during appends // locked during appends
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Object flushLock = new Object();
private final boolean enabled; private final boolean enabled;
@ -298,6 +300,7 @@ public class HLog implements Syncable {
private static Metric writeSize = new Metric(); private static Metric writeSize = new Metric();
// For measuring latency of syncs // For measuring latency of syncs
private static Metric syncTime = new Metric(); private static Metric syncTime = new Metric();
private static AtomicLong syncBatchSize = new AtomicLong();
//For measuring slow HLog appends //For measuring slow HLog appends
private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static AtomicLong slowHLogAppendCount = new AtomicLong();
private static Metric slowHLogAppendTime = new Metric(); private static Metric slowHLogAppendTime = new Metric();
@ -314,6 +317,10 @@ public class HLog implements Syncable {
return syncTime.get(); return syncTime.get();
} }
public static long getSyncBatchSize() {
return syncBatchSize.getAndSet(0);
}
public static long getSlowAppendCount() { public static long getSlowAppendCount() {
return slowHLogAppendCount.get(); return slowHLogAppendCount.get();
} }
@ -833,17 +840,11 @@ public class HLog implements Syncable {
try { try {
// Wait till all current transactions are written to the hlog. // Wait till all current transactions are written to the hlog.
// No new transactions can occur because we have the updatelock. // No new transactions can occur because we have the updatelock.
if (this.unflushedEntries.get() != this.syncedTillHere) { sync();
LOG.debug("cleanupCurrentWriter " +
" waiting for transactions to get synced " +
" total " + this.unflushedEntries.get() +
" synced till here " + syncedTillHere);
sync();
}
this.writer.close(); this.writer.close();
this.writer = null; this.writer = null;
closeErrorCount.set(0); closeErrorCount.set(0);
} catch (IOException e) { } catch (Exception e) {
LOG.error("Failed close of HLog writer", e); LOG.error("Failed close of HLog writer", e);
int errors = closeErrorCount.incrementAndGet(); int errors = closeErrorCount.incrementAndGet();
if (errors <= closeErrorsTolerated && !hasDeferredEntries()) { if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
@ -1006,7 +1007,7 @@ public class HLog implements Syncable {
* @param logEdit * @param logEdit
* @param logKey * @param logKey
* @param doSync shall we sync after writing the transaction * @param doSync shall we sync after writing the transaction
* @return The txid of this transaction * @return The seqnum of this transaction
* @throws IOException * @throws IOException
*/ */
public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
@ -1015,9 +1016,9 @@ public class HLog implements Syncable {
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
long txid = 0; long seqNum;
synchronized (updateLock) { synchronized (updateLock) {
long seqNum = obtainSeqNum(); seqNum = obtainSeqNum();
logKey.setLogSeqNum(seqNum); logKey.setLogSeqNum(seqNum);
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular // write for each region (i.e. the first edit added to the particular
@ -1027,10 +1028,9 @@ public class HLog implements Syncable {
this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
Long.valueOf(seqNum)); Long.valueOf(seqNum));
doWrite(regionInfo, logKey, logEdit, htd); doWrite(regionInfo, logKey, logEdit, htd);
txid = this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
if (htd.isDeferredLogFlush()) { if (htd.isDeferredLogFlush()) {
lastDeferredTxid = txid; lastDeferredTxid = seqNum;
} }
} }
@ -1040,9 +1040,9 @@ public class HLog implements Syncable {
(regionInfo.isMetaRegion() || (regionInfo.isMetaRegion() ||
!htd.isDeferredLogFlush())) { !htd.isDeferredLogFlush())) {
// sync txn to file system // sync txn to file system
this.sync(txid); this.sync(seqNum);
} }
return txid; return seqNum;
} }
/** /**
@ -1090,13 +1090,13 @@ public class HLog implements Syncable {
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd, boolean doSync) final long now, HTableDescriptor htd, boolean doSync)
throws IOException { throws IOException {
if (edits.isEmpty()) return this.unflushedEntries.get();; if (edits.isEmpty()) return this.logSeqNum.get();
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
long txid = 0; long seqNum;
synchronized (this.updateLock) { synchronized (this.updateLock) {
long seqNum = obtainSeqNum(); seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular // write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the // memstore). . When the cache is flushed, the entry for the
@ -1109,9 +1109,8 @@ public class HLog implements Syncable {
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
doWrite(info, logKey, edits, htd); doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
txid = this.unflushedEntries.incrementAndGet();
if (htd.isDeferredLogFlush()) { if (htd.isDeferredLogFlush()) {
lastDeferredTxid = txid; lastDeferredTxid = seqNum;
} }
} }
// Sync if catalog region, and if not then check if that table supports // Sync if catalog region, and if not then check if that table supports
@ -1120,9 +1119,9 @@ public class HLog implements Syncable {
(info.isMetaRegion() || (info.isMetaRegion() ||
!htd.isDeferredLogFlush())) { !htd.isDeferredLogFlush())) {
// sync txn to file system // sync txn to file system
this.sync(txid); this.sync(seqNum);
} }
return txid; return seqNum;
} }
/** /**
@ -1180,6 +1179,9 @@ public class HLog implements Syncable {
// goal is to increase the batchsize for writing-to-hdfs as well as // goal is to increase the batchsize for writing-to-hdfs as well as
// sync-to-hdfs, so that we can get better system throughput. // sync-to-hdfs, so that we can get better system throughput.
private List<Entry> pendingWrites = new LinkedList<Entry>(); private List<Entry> pendingWrites = new LinkedList<Entry>();
long lastSeqAppended = -1;
long lastSeqFlushed = -1;
private Object flushLock = new Object();
LogSyncer(long optionalFlushInterval) { LogSyncer(long optionalFlushInterval) {
this.optionalFlushInterval = optionalFlushInterval; this.optionalFlushInterval = optionalFlushInterval;
@ -1193,7 +1195,7 @@ public class HLog implements Syncable {
while(!this.isInterrupted() && !closeLogSyncer) { while(!this.isInterrupted() && !closeLogSyncer) {
try { try {
if (unflushedEntries.get() <= syncedTillHere) { if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) {
Thread.sleep(this.optionalFlushInterval); Thread.sleep(this.optionalFlushInterval);
} }
sync(); sync();
@ -1213,24 +1215,45 @@ public class HLog implements Syncable {
// our own queue rather than writing it to the HDFS output stream because // our own queue rather than writing it to the HDFS output stream because
// HDFSOutputStream.writeChunk is not lightweight at all. // HDFSOutputStream.writeChunk is not lightweight at all.
synchronized void append(Entry e) throws IOException { synchronized void append(Entry e) throws IOException {
long seq = e.getKey().getLogSeqNum();
assert seq > lastSeqAppended;
lastSeqAppended = seq;
pendingWrites.add(e); pendingWrites.add(e);
} }
// Returns all currently pending writes. New writes // Returns all currently pending writes. New writes
// will accumulate in a new list. // will accumulate in a new list.
synchronized List<Entry> getPendingWrites() { long flushWritesTo(Writer writer) throws IOException {
List<Entry> save = this.pendingWrites; synchronized (flushLock) {
this.pendingWrites = new LinkedList<Entry>(); List<Entry> pending;
return save;
}
// writes out pending entries to the HLog synchronized (this) {
void hlogFlush(Writer writer, List<Entry> pending) throws IOException { pending = pendingWrites;
if (pending == null) return; pendingWrites = new LinkedList<Entry>();
}
// write out all accumulated Entries to hdfs. boolean success = false;
for (Entry e : pending) { try {
writer.append(e); int numFlushed = 0;
for (Entry e : pending) {
writer.append(e);
long seq = e.getKey().getLogSeqNum();
assert seq > lastSeqFlushed;
lastSeqFlushed = seq;
numFlushed++;
}
syncBatchSize.addAndGet(numFlushed);
success = true;
} finally {
if (!success) {
// push back our batch into the pending list
synchronized (this) {
pending.addAll(pendingWrites);
pendingWrites = pending;
}
}
}
return lastSeqFlushed;
} }
} }
@ -1239,9 +1262,31 @@ public class HLog implements Syncable {
} }
} }
private static class SyncInfo {
private long syncedTillHere = 0;
synchronized long getLastSyncedTxId() {
return syncedTillHere;
}
synchronized void notifySynced(long txid) {
if (txid > syncedTillHere) {
syncedTillHere = txid;
}
notifyAll();
}
synchronized void waitForSync(long txid) throws InterruptedException {
while (syncedTillHere < txid) {
wait();
}
}
}
// sync all known transactions // sync all known transactions
private void syncer() throws IOException { private void syncer() throws IOException {
syncer(this.unflushedEntries.get()); // sync all pending items syncer(logSeqNum.get()); // sync all pending
} }
// sync all transactions upto the specified txid // sync all transactions upto the specified txid
@ -1253,47 +1298,26 @@ public class HLog implements Syncable {
} }
// if the transaction that we are interested in is already // if the transaction that we are interested in is already
// synced, then return immediately. // synced, then return immediately.
if (txid <= this.syncedTillHere) { if (syncInfo.getLastSyncedTxId() >= txid) {
return; return;
} }
try { try {
long doneUpto;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
// First flush all the pending writes to HDFS. Then long flushedSeqId;
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
synchronized (flushLock) {
if (txid <= this.syncedTillHere) {
return;
}
doneUpto = this.unflushedEntries.get();
List<Entry> pending = logSyncerThread.getPendingWrites();
try {
logSyncerThread.hlogFlush(tempWriter, pending);
} catch(IOException io) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
logSyncerThread.hlogFlush(tempWriter, pending);
}
}
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
return;
}
try { try {
flushedSeqId = logSyncerThread.flushWritesTo(tempWriter);
tempWriter.sync(); tempWriter.sync();
} catch(IOException io) { } catch(IOException io) {
synchronized (this.updateLock) { synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held // HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer; tempWriter = this.writer;
flushedSeqId = logSyncerThread.flushWritesTo(tempWriter);
tempWriter.sync(); tempWriter.sync();
} }
} }
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); syncInfo.notifySynced(flushedSeqId);
// We try to not acquire the updateLock just to update statistics.
// Make these statistics as AtomicLong.
syncTime.inc(System.currentTimeMillis() - now); syncTime.inc(System.currentTimeMillis() - now);
if (!this.logRollRunning) { if (!this.logRollRunning) {
checkLowReplication(); checkLowReplication();
@ -1548,14 +1572,15 @@ public class HLog implements Syncable {
if (this.closed) { if (this.closed) {
return; return;
} }
long txid = 0; long seqNumOfCompletionEdit;
synchronized (updateLock) { synchronized (updateLock) {
seqNumOfCompletionEdit = obtainSeqNum();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
WALEdit edit = completeCacheFlushLogEdit();
HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, WALEdit edit = completeCacheFlushLogEdit(logSeqId);
HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
logSyncerThread.append(new Entry(key, edit)); logSyncerThread.append(new Entry(key, edit));
txid = this.unflushedEntries.incrementAndGet();
writeTime.inc(System.currentTimeMillis() - now); writeTime.inc(System.currentTimeMillis() - now);
long len = 0; long len = 0;
for (KeyValue kv : edit.getKeyValues()) { for (KeyValue kv : edit.getKeyValues()) {
@ -1565,7 +1590,7 @@ public class HLog implements Syncable {
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
} }
// sync txn to file system // sync txn to file system
this.sync(txid); this.sync(seqNumOfCompletionEdit);
} finally { } finally {
// updateLock not needed for removing snapshot's entry // updateLock not needed for removing snapshot's entry
@ -1576,9 +1601,17 @@ public class HLog implements Syncable {
} }
} }
private WALEdit completeCacheFlushLogEdit() { private WALEdit completeCacheFlushLogEdit(long seqIdOfFlush) {
// The data is not actually used here - we just need to write
// something to the log to make sure we're still the owner of the
// pipeline.
byte[] data = Bytes.add(
COMPLETE_CACHE_FLUSH,
":".getBytes(Charsets.UTF_8),
Bytes.toBytes(String.valueOf(seqIdOfFlush)));
KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); System.currentTimeMillis(), data);
WALEdit e = new WALEdit(); WALEdit e = new WALEdit();
e.add(kv); e.add(kv);
return e; return e;
@ -1855,7 +1888,7 @@ public class HLog implements Syncable {
/** Provide access to currently deferred sequence num for tests */ /** Provide access to currently deferred sequence num for tests */
boolean hasDeferredEntries() { boolean hasDeferredEntries() {
return lastDeferredTxid > syncedTillHere; return lastDeferredTxid > syncInfo.getLastSyncedTxId();
} }
/** /**

View File

@ -245,7 +245,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override @Override
public void sync() throws IOException { public void sync() throws IOException {
this.writer.syncFs(); if (this.writer != null) this.writer.syncFs();
} }
@Override @Override

View File

@ -548,8 +548,9 @@ public class TestHLog {
KeyValue kv = val.getKeyValues().get(0); KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, assertTrue(Bytes.startsWith(
val.getKeyValues().get(0).getValue())); val.getKeyValues().get(0).getValue(),
HLog.COMPLETE_CACHE_FLUSH));
System.out.println(key + " " + val); System.out.println(key + " " + val);
} }
} finally { } finally {
@ -616,8 +617,9 @@ public class TestHLog {
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, assertTrue(Bytes.startsWith(
val.getValue())); val.getValue(),
HLog.COMPLETE_CACHE_FLUSH));
System.out.println(entry.getKey() + " " + val); System.out.println(entry.getKey() + " " + val);
} }
} finally { } finally {