From aa5ce08ff1b4ef079956939c964ae34aae2c0f51 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Thu, 17 May 2012 15:58:09 +0000 Subject: [PATCH] HBASE-5826 Revert, Todd has review comments pending. git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1339673 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/wal/HLog.java | 181 +++++++----------- .../wal/SequenceFileLogWriter.java | 2 +- .../hbase/regionserver/wal/TestHLog.java | 10 +- 3 files changed, 79 insertions(+), 114 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 147867faeaa..65cb20975a1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -48,7 +48,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -140,10 +139,8 @@ public class HLog implements Syncable { private final long optionalFlushInterval; private final long blocksize; private final String prefix; - - /** tracking information about what has been synced */ - private SyncInfo syncInfo = new SyncInfo(); - + private final AtomicLong unflushedEntries = new AtomicLong(0); + private volatile long syncedTillHere = 0; private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -233,6 +230,7 @@ public class HLog implements Syncable { // during an update // locked during appends private final Object updateLock = new Object(); + private final Object flushLock = new Object(); private final boolean enabled; @@ -300,7 +298,6 @@ public class HLog implements Syncable { private static Metric writeSize = new Metric(); // For measuring latency of syncs private static Metric syncTime = new Metric(); - private static AtomicLong syncBatchSize = new AtomicLong(); //For measuring slow HLog appends private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static Metric slowHLogAppendTime = new Metric(); @@ -317,10 +314,6 @@ public class HLog implements Syncable { return syncTime.get(); } - public static long getSyncBatchSize() { - return syncBatchSize.getAndSet(0); - } - public static long getSlowAppendCount() { return slowHLogAppendCount.get(); } @@ -840,11 +833,17 @@ public class HLog implements Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - sync(); + if (this.unflushedEntries.get() != this.syncedTillHere) { + LOG.debug("cleanupCurrentWriter " + + " waiting for transactions to get synced " + + " total " + this.unflushedEntries.get() + + " synced till here " + syncedTillHere); + sync(); + } this.writer.close(); this.writer = null; closeErrorCount.set(0); - } catch (Exception e) { + } catch (IOException e) { LOG.error("Failed close of HLog writer", e); int errors = closeErrorCount.incrementAndGet(); if (errors <= closeErrorsTolerated && !hasDeferredEntries()) { @@ -1007,7 +1006,7 @@ public class HLog implements Syncable { * @param logEdit * @param logKey * @param doSync shall we sync after writing the transaction - * @return The seqnum of this transaction + * @return The txid of this transaction * @throws IOException */ public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, @@ -1016,9 +1015,9 @@ public class HLog implements Syncable { if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum; + long txid = 0; synchronized (updateLock) { - seqNum = obtainSeqNum(); + long seqNum = obtainSeqNum(); logKey.setLogSeqNum(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular @@ -1028,9 +1027,10 @@ public class HLog implements Syncable { this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit, htd); + txid = this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = seqNum; + lastDeferredTxid = txid; } } @@ -1040,9 +1040,9 @@ public class HLog implements Syncable { (regionInfo.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(seqNum); + this.sync(txid); } - return seqNum; + return txid; } /** @@ -1090,13 +1090,13 @@ public class HLog implements Syncable { private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd, boolean doSync) throws IOException { - if (edits.isEmpty()) return this.logSeqNum.get(); + if (edits.isEmpty()) return this.unflushedEntries.get();; if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum; + long txid = 0; synchronized (this.updateLock) { - seqNum = obtainSeqNum(); + long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -1109,8 +1109,9 @@ public class HLog implements Syncable { HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); + txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = seqNum; + lastDeferredTxid = txid; } } // Sync if catalog region, and if not then check if that table supports @@ -1119,9 +1120,9 @@ public class HLog implements Syncable { (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(seqNum); + this.sync(txid); } - return seqNum; + return txid; } /** @@ -1179,9 +1180,6 @@ public class HLog implements Syncable { // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); - long lastSeqAppended = -1; - long lastSeqFlushed = -1; - private Object flushLock = new Object(); LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; @@ -1195,7 +1193,7 @@ public class HLog implements Syncable { while(!this.isInterrupted() && !closeLogSyncer) { try { - if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) { + if (unflushedEntries.get() <= syncedTillHere) { Thread.sleep(this.optionalFlushInterval); } sync(); @@ -1215,45 +1213,24 @@ public class HLog implements Syncable { // our own queue rather than writing it to the HDFS output stream because // HDFSOutputStream.writeChunk is not lightweight at all. synchronized void append(Entry e) throws IOException { - long seq = e.getKey().getLogSeqNum(); - assert seq > lastSeqAppended; - lastSeqAppended = seq; pendingWrites.add(e); } // Returns all currently pending writes. New writes // will accumulate in a new list. - long flushWritesTo(Writer writer) throws IOException { - synchronized (flushLock) { - List pending; + synchronized List getPendingWrites() { + List save = this.pendingWrites; + this.pendingWrites = new LinkedList(); + return save; + } - synchronized (this) { - pending = pendingWrites; - pendingWrites = new LinkedList(); - } + // writes out pending entries to the HLog + void hlogFlush(Writer writer, List pending) throws IOException { + if (pending == null) return; - boolean success = false; - try { - int numFlushed = 0; - for (Entry e : pending) { - writer.append(e); - long seq = e.getKey().getLogSeqNum(); - assert seq > lastSeqFlushed; - lastSeqFlushed = seq; - numFlushed++; - } - syncBatchSize.addAndGet(numFlushed); - success = true; - } finally { - if (!success) { - // push back our batch into the pending list - synchronized (this) { - pending.addAll(pendingWrites); - pendingWrites = pending; - } - } - } - return lastSeqFlushed; + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); } } @@ -1262,31 +1239,9 @@ public class HLog implements Syncable { } } - private static class SyncInfo { - private long syncedTillHere = 0; - - synchronized long getLastSyncedTxId() { - return syncedTillHere; - } - - synchronized void notifySynced(long txid) { - if (txid > syncedTillHere) { - syncedTillHere = txid; - } - notifyAll(); - } - - synchronized void waitForSync(long txid) throws InterruptedException { - while (syncedTillHere < txid) { - wait(); - } - } - } - - // sync all known transactions private void syncer() throws IOException { - syncer(logSeqNum.get()); // sync all pending + syncer(this.unflushedEntries.get()); // sync all pending items } // sync all transactions upto the specified txid @@ -1298,26 +1253,47 @@ public class HLog implements Syncable { } // if the transaction that we are interested in is already // synced, then return immediately. - if (syncInfo.getLastSyncedTxId() >= txid) { + if (txid <= this.syncedTillHere) { return; } try { + long doneUpto; long now = System.currentTimeMillis(); - long flushedSeqId; + // First flush all the pending writes to HDFS. Then + // issue the sync to HDFS. If sync is successful, then update + // syncedTillHere to indicate that transactions till this + // number has been successfully synced. + synchronized (flushLock) { + if (txid <= this.syncedTillHere) { + return; + } + doneUpto = this.unflushedEntries.get(); + List pending = logSyncerThread.getPendingWrites(); + try { + logSyncerThread.hlogFlush(tempWriter, pending); + } catch(IOException io) { + synchronized (this.updateLock) { + // HBASE-4387, HBASE-5623, retry with updateLock held + tempWriter = this.writer; + logSyncerThread.hlogFlush(tempWriter, pending); + } + } + } + // another thread might have sync'ed avoid double-sync'ing + if (txid <= this.syncedTillHere) { + return; + } try { - flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; - flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); } } - syncInfo.notifySynced(flushedSeqId); - // We try to not acquire the updateLock just to update statistics. - // Make these statistics as AtomicLong. + this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); + syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); @@ -1572,15 +1548,14 @@ public class HLog implements Syncable { if (this.closed) { return; } - long seqNumOfCompletionEdit; + long txid = 0; synchronized (updateLock) { - seqNumOfCompletionEdit = obtainSeqNum(); long now = System.currentTimeMillis(); - - WALEdit edit = completeCacheFlushLogEdit(logSeqId); - HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit, + WALEdit edit = completeCacheFlushLogEdit(); + HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); + txid = this.unflushedEntries.incrementAndGet(); writeTime.inc(System.currentTimeMillis() - now); long len = 0; for (KeyValue kv : edit.getKeyValues()) { @@ -1590,7 +1565,7 @@ public class HLog implements Syncable { this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(seqNumOfCompletionEdit); + this.sync(txid); } finally { // updateLock not needed for removing snapshot's entry @@ -1601,17 +1576,9 @@ public class HLog implements Syncable { } } - private WALEdit completeCacheFlushLogEdit(long seqIdOfFlush) { - // The data is not actually used here - we just need to write - // something to the log to make sure we're still the owner of the - // pipeline. - byte[] data = Bytes.add( - COMPLETE_CACHE_FLUSH, - ":".getBytes(Charsets.UTF_8), - Bytes.toBytes(String.valueOf(seqIdOfFlush))); - + private WALEdit completeCacheFlushLogEdit() { KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, - System.currentTimeMillis(), data); + System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); WALEdit e = new WALEdit(); e.add(kv); return e; @@ -1888,7 +1855,7 @@ public class HLog implements Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncInfo.getLastSyncedTxId(); + return lastDeferredTxid > syncedTillHere; } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 8abb7f1a3fd..5e810c0df80 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -245,7 +245,7 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void sync() throws IOException { - if (this.writer != null) this.writer.syncFs(); + this.writer.syncFs(); } @Override diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 93ded443f49..7c3cb70e094 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -548,9 +548,8 @@ public class TestHLog { KeyValue kv = val.getKeyValues().get(0); assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); - assertTrue(Bytes.startsWith( - val.getKeyValues().get(0).getValue(), - HLog.COMPLETE_CACHE_FLUSH)); + assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, + val.getKeyValues().get(0).getValue())); System.out.println(key + " " + val); } } finally { @@ -617,9 +616,8 @@ public class TestHLog { assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); - assertTrue(Bytes.startsWith( - val.getValue(), - HLog.COMPLETE_CACHE_FLUSH)); + assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, + val.getValue())); System.out.println(entry.getKey() + " " + val); } } finally {