HBASE-5826 Revert, Todd has review comments pending.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1339673 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-05-17 15:58:09 +00:00
parent 022ef75950
commit aa5ce08ff1
3 changed files with 79 additions and 114 deletions

View File

@ -48,7 +48,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.base.Charsets;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -140,10 +139,8 @@ public class HLog implements Syncable {
private final long optionalFlushInterval; private final long optionalFlushInterval;
private final long blocksize; private final long blocksize;
private final String prefix; private final String prefix;
private final AtomicLong unflushedEntries = new AtomicLong(0);
/** tracking information about what has been synced */ private volatile long syncedTillHere = 0;
private SyncInfo syncInfo = new SyncInfo();
private long lastDeferredTxid; private long lastDeferredTxid;
private final Path oldLogDir; private final Path oldLogDir;
private volatile boolean logRollRunning; private volatile boolean logRollRunning;
@ -233,6 +230,7 @@ public class HLog implements Syncable {
// during an update // during an update
// locked during appends // locked during appends
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Object flushLock = new Object();
private final boolean enabled; private final boolean enabled;
@ -300,7 +298,6 @@ public class HLog implements Syncable {
private static Metric writeSize = new Metric(); private static Metric writeSize = new Metric();
// For measuring latency of syncs // For measuring latency of syncs
private static Metric syncTime = new Metric(); private static Metric syncTime = new Metric();
private static AtomicLong syncBatchSize = new AtomicLong();
//For measuring slow HLog appends //For measuring slow HLog appends
private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static AtomicLong slowHLogAppendCount = new AtomicLong();
private static Metric slowHLogAppendTime = new Metric(); private static Metric slowHLogAppendTime = new Metric();
@ -317,10 +314,6 @@ public class HLog implements Syncable {
return syncTime.get(); return syncTime.get();
} }
public static long getSyncBatchSize() {
return syncBatchSize.getAndSet(0);
}
public static long getSlowAppendCount() { public static long getSlowAppendCount() {
return slowHLogAppendCount.get(); return slowHLogAppendCount.get();
} }
@ -840,11 +833,17 @@ public class HLog implements Syncable {
try { try {
// Wait till all current transactions are written to the hlog. // Wait till all current transactions are written to the hlog.
// No new transactions can occur because we have the updatelock. // No new transactions can occur because we have the updatelock.
if (this.unflushedEntries.get() != this.syncedTillHere) {
LOG.debug("cleanupCurrentWriter " +
" waiting for transactions to get synced " +
" total " + this.unflushedEntries.get() +
" synced till here " + syncedTillHere);
sync(); sync();
}
this.writer.close(); this.writer.close();
this.writer = null; this.writer = null;
closeErrorCount.set(0); closeErrorCount.set(0);
} catch (Exception e) { } catch (IOException e) {
LOG.error("Failed close of HLog writer", e); LOG.error("Failed close of HLog writer", e);
int errors = closeErrorCount.incrementAndGet(); int errors = closeErrorCount.incrementAndGet();
if (errors <= closeErrorsTolerated && !hasDeferredEntries()) { if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
@ -1007,7 +1006,7 @@ public class HLog implements Syncable {
* @param logEdit * @param logEdit
* @param logKey * @param logKey
* @param doSync shall we sync after writing the transaction * @param doSync shall we sync after writing the transaction
* @return The seqnum of this transaction * @return The txid of this transaction
* @throws IOException * @throws IOException
*/ */
public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
@ -1016,9 +1015,9 @@ public class HLog implements Syncable {
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
long seqNum; long txid = 0;
synchronized (updateLock) { synchronized (updateLock) {
seqNum = obtainSeqNum(); long seqNum = obtainSeqNum();
logKey.setLogSeqNum(seqNum); logKey.setLogSeqNum(seqNum);
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular // write for each region (i.e. the first edit added to the particular
@ -1028,9 +1027,10 @@ public class HLog implements Syncable {
this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
Long.valueOf(seqNum)); Long.valueOf(seqNum));
doWrite(regionInfo, logKey, logEdit, htd); doWrite(regionInfo, logKey, logEdit, htd);
txid = this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
if (htd.isDeferredLogFlush()) { if (htd.isDeferredLogFlush()) {
lastDeferredTxid = seqNum; lastDeferredTxid = txid;
} }
} }
@ -1040,9 +1040,9 @@ public class HLog implements Syncable {
(regionInfo.isMetaRegion() || (regionInfo.isMetaRegion() ||
!htd.isDeferredLogFlush())) { !htd.isDeferredLogFlush())) {
// sync txn to file system // sync txn to file system
this.sync(seqNum); this.sync(txid);
} }
return seqNum; return txid;
} }
/** /**
@ -1090,13 +1090,13 @@ public class HLog implements Syncable {
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd, boolean doSync) final long now, HTableDescriptor htd, boolean doSync)
throws IOException { throws IOException {
if (edits.isEmpty()) return this.logSeqNum.get(); if (edits.isEmpty()) return this.unflushedEntries.get();;
if (this.closed) { if (this.closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
long seqNum; long txid = 0;
synchronized (this.updateLock) { synchronized (this.updateLock) {
seqNum = obtainSeqNum(); long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular // write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the // memstore). . When the cache is flushed, the entry for the
@ -1109,8 +1109,9 @@ public class HLog implements Syncable {
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
doWrite(info, logKey, edits, htd); doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
txid = this.unflushedEntries.incrementAndGet();
if (htd.isDeferredLogFlush()) { if (htd.isDeferredLogFlush()) {
lastDeferredTxid = seqNum; lastDeferredTxid = txid;
} }
} }
// Sync if catalog region, and if not then check if that table supports // Sync if catalog region, and if not then check if that table supports
@ -1119,9 +1120,9 @@ public class HLog implements Syncable {
(info.isMetaRegion() || (info.isMetaRegion() ||
!htd.isDeferredLogFlush())) { !htd.isDeferredLogFlush())) {
// sync txn to file system // sync txn to file system
this.sync(seqNum); this.sync(txid);
} }
return seqNum; return txid;
} }
/** /**
@ -1179,9 +1180,6 @@ public class HLog implements Syncable {
// goal is to increase the batchsize for writing-to-hdfs as well as // goal is to increase the batchsize for writing-to-hdfs as well as
// sync-to-hdfs, so that we can get better system throughput. // sync-to-hdfs, so that we can get better system throughput.
private List<Entry> pendingWrites = new LinkedList<Entry>(); private List<Entry> pendingWrites = new LinkedList<Entry>();
long lastSeqAppended = -1;
long lastSeqFlushed = -1;
private Object flushLock = new Object();
LogSyncer(long optionalFlushInterval) { LogSyncer(long optionalFlushInterval) {
this.optionalFlushInterval = optionalFlushInterval; this.optionalFlushInterval = optionalFlushInterval;
@ -1195,7 +1193,7 @@ public class HLog implements Syncable {
while(!this.isInterrupted() && !closeLogSyncer) { while(!this.isInterrupted() && !closeLogSyncer) {
try { try {
if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) { if (unflushedEntries.get() <= syncedTillHere) {
Thread.sleep(this.optionalFlushInterval); Thread.sleep(this.optionalFlushInterval);
} }
sync(); sync();
@ -1215,45 +1213,24 @@ public class HLog implements Syncable {
// our own queue rather than writing it to the HDFS output stream because // our own queue rather than writing it to the HDFS output stream because
// HDFSOutputStream.writeChunk is not lightweight at all. // HDFSOutputStream.writeChunk is not lightweight at all.
synchronized void append(Entry e) throws IOException { synchronized void append(Entry e) throws IOException {
long seq = e.getKey().getLogSeqNum();
assert seq > lastSeqAppended;
lastSeqAppended = seq;
pendingWrites.add(e); pendingWrites.add(e);
} }
// Returns all currently pending writes. New writes // Returns all currently pending writes. New writes
// will accumulate in a new list. // will accumulate in a new list.
long flushWritesTo(Writer writer) throws IOException { synchronized List<Entry> getPendingWrites() {
synchronized (flushLock) { List<Entry> save = this.pendingWrites;
List<Entry> pending; this.pendingWrites = new LinkedList<Entry>();
return save;
synchronized (this) {
pending = pendingWrites;
pendingWrites = new LinkedList<Entry>();
} }
boolean success = false; // writes out pending entries to the HLog
try { void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
int numFlushed = 0; if (pending == null) return;
// write out all accumulated Entries to hdfs.
for (Entry e : pending) { for (Entry e : pending) {
writer.append(e); writer.append(e);
long seq = e.getKey().getLogSeqNum();
assert seq > lastSeqFlushed;
lastSeqFlushed = seq;
numFlushed++;
}
syncBatchSize.addAndGet(numFlushed);
success = true;
} finally {
if (!success) {
// push back our batch into the pending list
synchronized (this) {
pending.addAll(pendingWrites);
pendingWrites = pending;
}
}
}
return lastSeqFlushed;
} }
} }
@ -1262,31 +1239,9 @@ public class HLog implements Syncable {
} }
} }
private static class SyncInfo {
private long syncedTillHere = 0;
synchronized long getLastSyncedTxId() {
return syncedTillHere;
}
synchronized void notifySynced(long txid) {
if (txid > syncedTillHere) {
syncedTillHere = txid;
}
notifyAll();
}
synchronized void waitForSync(long txid) throws InterruptedException {
while (syncedTillHere < txid) {
wait();
}
}
}
// sync all known transactions // sync all known transactions
private void syncer() throws IOException { private void syncer() throws IOException {
syncer(logSeqNum.get()); // sync all pending syncer(this.unflushedEntries.get()); // sync all pending items
} }
// sync all transactions upto the specified txid // sync all transactions upto the specified txid
@ -1298,26 +1253,47 @@ public class HLog implements Syncable {
} }
// if the transaction that we are interested in is already // if the transaction that we are interested in is already
// synced, then return immediately. // synced, then return immediately.
if (syncInfo.getLastSyncedTxId() >= txid) { if (txid <= this.syncedTillHere) {
return; return;
} }
try { try {
long doneUpto;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long flushedSeqId; // First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
synchronized (flushLock) {
if (txid <= this.syncedTillHere) {
return;
}
doneUpto = this.unflushedEntries.get();
List<Entry> pending = logSyncerThread.getPendingWrites();
try {
logSyncerThread.hlogFlush(tempWriter, pending);
} catch(IOException io) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
logSyncerThread.hlogFlush(tempWriter, pending);
}
}
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
return;
}
try { try {
flushedSeqId = logSyncerThread.flushWritesTo(tempWriter);
tempWriter.sync(); tempWriter.sync();
} catch(IOException io) { } catch(IOException io) {
synchronized (this.updateLock) { synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held // HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer; tempWriter = this.writer;
flushedSeqId = logSyncerThread.flushWritesTo(tempWriter);
tempWriter.sync(); tempWriter.sync();
} }
} }
syncInfo.notifySynced(flushedSeqId); this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
// We try to not acquire the updateLock just to update statistics.
// Make these statistics as AtomicLong.
syncTime.inc(System.currentTimeMillis() - now); syncTime.inc(System.currentTimeMillis() - now);
if (!this.logRollRunning) { if (!this.logRollRunning) {
checkLowReplication(); checkLowReplication();
@ -1572,15 +1548,14 @@ public class HLog implements Syncable {
if (this.closed) { if (this.closed) {
return; return;
} }
long seqNumOfCompletionEdit; long txid = 0;
synchronized (updateLock) { synchronized (updateLock) {
seqNumOfCompletionEdit = obtainSeqNum();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
WALEdit edit = completeCacheFlushLogEdit();
WALEdit edit = completeCacheFlushLogEdit(logSeqId); HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
logSyncerThread.append(new Entry(key, edit)); logSyncerThread.append(new Entry(key, edit));
txid = this.unflushedEntries.incrementAndGet();
writeTime.inc(System.currentTimeMillis() - now); writeTime.inc(System.currentTimeMillis() - now);
long len = 0; long len = 0;
for (KeyValue kv : edit.getKeyValues()) { for (KeyValue kv : edit.getKeyValues()) {
@ -1590,7 +1565,7 @@ public class HLog implements Syncable {
this.numEntries.incrementAndGet(); this.numEntries.incrementAndGet();
} }
// sync txn to file system // sync txn to file system
this.sync(seqNumOfCompletionEdit); this.sync(txid);
} finally { } finally {
// updateLock not needed for removing snapshot's entry // updateLock not needed for removing snapshot's entry
@ -1601,17 +1576,9 @@ public class HLog implements Syncable {
} }
} }
private WALEdit completeCacheFlushLogEdit(long seqIdOfFlush) { private WALEdit completeCacheFlushLogEdit() {
// The data is not actually used here - we just need to write
// something to the log to make sure we're still the owner of the
// pipeline.
byte[] data = Bytes.add(
COMPLETE_CACHE_FLUSH,
":".getBytes(Charsets.UTF_8),
Bytes.toBytes(String.valueOf(seqIdOfFlush)));
KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
System.currentTimeMillis(), data); System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
WALEdit e = new WALEdit(); WALEdit e = new WALEdit();
e.add(kv); e.add(kv);
return e; return e;
@ -1888,7 +1855,7 @@ public class HLog implements Syncable {
/** Provide access to currently deferred sequence num for tests */ /** Provide access to currently deferred sequence num for tests */
boolean hasDeferredEntries() { boolean hasDeferredEntries() {
return lastDeferredTxid > syncInfo.getLastSyncedTxId(); return lastDeferredTxid > syncedTillHere;
} }
/** /**

View File

@ -245,7 +245,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override @Override
public void sync() throws IOException { public void sync() throws IOException {
if (this.writer != null) this.writer.syncFs(); this.writer.syncFs();
} }
@Override @Override

View File

@ -548,9 +548,8 @@ public class TestHLog {
KeyValue kv = val.getKeyValues().get(0); KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
assertTrue(Bytes.startsWith( assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
val.getKeyValues().get(0).getValue(), val.getKeyValues().get(0).getValue()));
HLog.COMPLETE_CACHE_FLUSH));
System.out.println(key + " " + val); System.out.println(key + " " + val);
} }
} finally { } finally {
@ -617,9 +616,8 @@ public class TestHLog {
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
assertTrue(Bytes.startsWith( assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
val.getValue(), val.getValue()));
HLog.COMPLETE_CACHE_FLUSH));
System.out.println(entry.getKey() + " " + val); System.out.println(entry.getKey() + " " + val);
} }
} finally { } finally {