From 644cf0f636b3d8df28f79689e0e30731ee649825 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Mon, 21 Jan 2013 21:33:37 +0000 Subject: [PATCH] HBASE-7329 remove flush-related records from WAL and make locking more granular (Sergey) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1436632 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/util/DrainBarrier.java | 131 +++++ .../hadoop/hbase/util/TestDrainBarrier.java | 119 +++++ .../hadoop/hbase/mapreduce/WALPlayer.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 45 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 452 +++++++++--------- .../hadoop/hbase/regionserver/wal/HLog.java | 88 +--- .../hbase/regionserver/wal/HLogUtil.java | 28 -- .../wal/SequenceFileLogWriter.java | 7 +- .../hbase/regionserver/wal/TestHLog.java | 53 +- .../wal/TestWALActionsListener.java | 2 +- .../hbase/regionserver/wal/TestWALReplay.java | 11 +- 11 files changed, 537 insertions(+), 401 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java new file mode 100644 index 00000000000..218ebe42de6 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java @@ -0,0 +1,131 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A simple barrier that can be used by classes that need to wait for some operations to + * finish before stopping/closing/etc. forever. + */ +public class DrainBarrier { + /** + * Contains the number of outstanding operations, as well as flags. + * Initially, the number of operations is 1. Each beginOp increments, and endOp decrements it. + * beginOp does not proceed when it sees the draining flag. When stop is called, it atomically + * decrements the number of operations (the initial 1) and sets the draining flag. If stop did + * the decrement to zero, that means there are no more operations outstanding, so stop is done. + * Otherwise, stop blocks, and the endOp that decrements the count to 0 unblocks it. + */ + private final AtomicLong valueAndFlags = new AtomicLong(inc(0)); + private final static long DRAINING_FLAG = 0x1; + private final static int FLAG_BIT_COUNT = 1; + + /** + * Tries to start an operation. + * @return false iff the stop is in progress, and the operation cannot be started. + */ + public boolean beginOp() { + long oldValAndFlags; + do { + oldValAndFlags = valueAndFlags.get(); + if (isDraining(oldValAndFlags)) return false; + } while (!valueAndFlags.compareAndSet(oldValAndFlags, inc(oldValAndFlags))); + return true; + } + + /** + * Ends the operation. Unblocks the blocked caller of stop, if necessary. + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", + justification="First, we do change the state before notify, 2nd, it doesn't even matter") + public void endOp() { + long oldValAndFlags; + do { + oldValAndFlags = valueAndFlags.get(); + long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1; + if (getValue(oldValAndFlags) == unacceptableCount) { + throw new AssertionError("endOp called without corresponding beginOp call (" + + "the current count is " + unacceptableCount + ")"); + } + } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags))); + if (getValue(oldValAndFlags) == 1) { + synchronized (this) { this.notifyAll(); } + } + } + + /** + * Blocks new operations from starting, waits for the current ones to drain. + * If someone already called it, returns immediately, which is currently unavoidable as + * most of the users stop and close things right and left, and hope for the best. + * stopAndWaitForOpsOnce asserts instead. + * @throws InterruptedException the wait for operations has been interrupted. + */ + public void stopAndDrainOps() throws InterruptedException { + stopAndDrainOps(true); + } + + /** + * Blocks new operations from starting, waits for the current ones to drain. + * Can only be called once. + * @throws InterruptedException the wait for operations has been interrupted. + */ + public void stopAndDrainOpsOnce() throws InterruptedException { + stopAndDrainOps(false); + } + + /** + * @param ignoreRepeatedCalls If this is true and somebody already called stop, this method + * will return immediately if true; if this is false and somebody + * already called stop, it will assert. + */ + // Justification for warnings - wait is not unconditional, and contrary to what WA_NOT_IN_LOOP + // description says we are not waiting on multiple conditions. + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}) + private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws InterruptedException { + long oldValAndFlags; + do { + oldValAndFlags = valueAndFlags.get(); + if (isDraining(oldValAndFlags)) { + if (ignoreRepeatedCalls) return; + throw new AssertionError("stopAndWaitForOpsOnce called more than once"); + } + } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) | DRAINING_FLAG)); + if (getValue(oldValAndFlags) == 1) return; // There were no operations outstanding. + synchronized (this) { this.wait(); } + } + + // Helper methods. + private static final boolean isDraining(long valueAndFlags) { + return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG; + } + + private static final long getValue(long valueAndFlags) { + return valueAndFlags >> FLAG_BIT_COUNT; + } + + private static final long inc(long valueAndFlags) { + return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow. + } + + private static final long dec(long valueAndFlags) { + return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked outside. + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java new file mode 100644 index 00000000000..065f551f5f2 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +@Category(SmallTests.class) +public class TestDrainBarrier { + + @Test + public void testBeginEndStopWork() throws Exception { + DrainBarrier barrier = new DrainBarrier(); + assertTrue(barrier.beginOp()); + assertTrue(barrier.beginOp()); + barrier.endOp(); + barrier.endOp(); + barrier.stopAndDrainOps(); + assertFalse(barrier.beginOp()); + } + + @Test + public void testUnmatchedEndAssert() throws Exception { + DrainBarrier barrier = new DrainBarrier(); + try { + barrier.endOp(); + fail("Should have asserted"); + } catch (AssertionError e) { + } + + barrier.beginOp(); + barrier.beginOp(); + barrier.endOp(); + barrier.endOp(); + try { + barrier.endOp(); + fail("Should have asserted"); + } catch (AssertionError e) { + } + } + + @Test + public void testStopWithoutOpsDoesntBlock() throws Exception { + DrainBarrier barrier = new DrainBarrier(); + barrier.stopAndDrainOpsOnce(); + + barrier = new DrainBarrier(); + barrier.beginOp(); + barrier.endOp(); + barrier.stopAndDrainOpsOnce(); + } + + @Test + /** This test tests blocking and can have false positives in very bad timing cases. */ + public void testStopIsBlockedByOps() throws Exception { + final DrainBarrier barrier = new DrainBarrier(); + barrier.beginOp(); + barrier.beginOp(); + barrier.beginOp(); + barrier.endOp(); + + Thread stoppingThread = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.stopAndDrainOpsOnce(); + } catch (InterruptedException e) { + fail("Should not have happened"); + } + } + }); + stoppingThread.start(); + + // First "end" should not unblock the thread, but the second should. + barrier.endOp(); + stoppingThread.join(1000); + assertTrue(stoppingThread.isAlive()); + barrier.endOp(); + stoppingThread.join(30000); // When not broken, will be a very fast wait; set safe value. + assertFalse(stoppingThread.isAlive()); + } + + @Test + public void testMultipleStopOnceAssert() throws Exception { + DrainBarrier barrier = new DrainBarrier(); + barrier.stopAndDrainOpsOnce(); + try { + barrier.stopAndDrainOpsOnce(); + fail("Should have asserted"); + } catch (AssertionError e) { + } + } + + @Test + public void testMultipleSloppyStopsHaveNoEffect() throws Exception { + DrainBarrier barrier = new DrainBarrier(); + barrier.stopAndDrainOps(); + barrier.stopAndDrainOps(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 976c098896d..8dd8e9dc5cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -126,7 +126,7 @@ public class WALPlayer extends Configured implements Tool { Delete del = null; KeyValue lastKV = null; for (KeyValue kv : value.getKeyValues()) { - // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit + // filtering HLog meta entries if (HLogUtil.isMetaFamily(kv.getFamily())) continue; // A WALEdit may contain multiple operations (HBASE-3584) and/or diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7aa71e88a6a..6c5e4b97efd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1553,17 +1553,26 @@ public class HRegion implements HeapSize { // , Writable{ long flushsize = this.memstoreSize.get(); status.setStatus("Preparing to flush by snapshotting stores"); List storeFlushers = new ArrayList(stores.size()); - long completeSeqId = -1L; + long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w); - sequenceId = (wal == null)? myseqid: - wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); - completeSeqId = this.getCompleteCacheFlushSequenceId(sequenceId); + if (wal != null) { + Long startSeqId = wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); + if (startSeqId == null) { + status.setStatus("Flush will not be started for [" + this.regionInfo.getEncodedName() + + "] - WAL is going away"); + return false; + } + flushSeqId = startSeqId.longValue(); + } else { + flushSeqId = myseqid; + } + for (Store s : stores.values()) { - storeFlushers.add(s.getStoreFlusher(completeSeqId)); + storeFlushers.add(s.getStoreFlusher(flushSeqId)); } // prepare flush (take a snapshot) @@ -1632,22 +1641,14 @@ public class HRegion implements HeapSize { // , Writable{ throw dse; } - // If we get to here, the HStores have been written. If we get an - // error in completeCacheFlush it will release the lock it is holding - - // B. Write a FLUSHCACHE-COMPLETE message to the log. - // This tells future readers that the HStores were emitted correctly, - // and that all updates to the log for this regionName that have lower - // log-sequence-ids can be safely ignored. + // If we get to here, the HStores have been written. if (wal != null) { - wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), - regionInfo.getTableName(), completeSeqId, - this.getRegionInfo().isMetaRegion()); + wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes()); } // Update the last flushed sequence id for region if (this.rsServices != null) { - completeSequenceId = completeSeqId; + completeSequenceId = flushSeqId; } // C. Finally notify anyone waiting on memstore to clear: @@ -1672,18 +1673,6 @@ public class HRegion implements HeapSize { // , Writable{ return compactionRequested; } - /** - * Get the sequence number to be associated with this cache flush. Used by - * TransactionalRegion to not complete pending transactions. - * - * - * @param currentSequenceId - * @return sequence id to complete the cache flush with - */ - protected long getCompleteCacheFlushSequenceId(long currentSequenceId) { - return currentSequenceId; - } - ////////////////////////////////////////////////////////////////////////////// // get() methods for client use. ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 6d4e525847a..e38ef41721c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -24,8 +24,10 @@ import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URLEncoder; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; @@ -132,23 +135,45 @@ class FSHLog implements HLog, Syncable { private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; - /* + /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ + private DrainBarrier closeBarrier = new DrainBarrier(); + + /** * Current log file. */ Writer writer; - /* + /** * Map of all log files but the current one. */ final SortedMap outputfiles = Collections.synchronizedSortedMap(new TreeMap()); - /* - * Map of encoded region names to their most recent sequence/edit id in their - * memstore. + + /** + * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums, + * with the exception of append's putIfAbsent into oldestUnflushedSeqNums. + * We only use these to find out the low bound seqNum, or to find regions with old seqNums to + * force flush them, so we don't care about these numbers messing with anything. */ + private final Object oldestSeqNumsLock = new Object(); + + /** + * This lock makes sure only one log roll runs at the same time. Should not be taken while + * any other lock is held. We don't just use synchronized because that results in bogus and + * tedious findbugs warning when it thinks synchronized controls writer thread safety */ + private final Object rollWriterLock = new Object(); + + /** + * Map of encoded region names to their most recent sequence/edit id in their memstore. */ - private final ConcurrentSkipListMap lastSeqWritten = + private final ConcurrentSkipListMap oldestUnflushedSeqNums = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + /** + * Map of encoded region names to their most recent sequence/edit id in their memstore; + * contains the regions that are currently flushing. That way we can store two numbers for + * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region. + */ + private final Map oldestFlushingSeqNums = new HashMap(); private volatile boolean closed = false; @@ -178,10 +203,6 @@ class FSHLog implements HLog, Syncable { // of the default Hdfs block size. private final long logrollsize; - // This lock prevents starting a log roll during a cache flush. - // synchronized is insufficient because a cache flush spans two method calls. - private final Lock cacheFlushLock = new ReentrantLock(); - // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update // locked during appends @@ -472,88 +493,77 @@ class FSHLog implements HLog, Syncable { @Override public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - // Return if nothing to flush. - if (!force && this.writer != null && this.numEntries.get() <= 0) { - return null; - } - byte [][] regionsToFlush = null; - this.cacheFlushLock.lock(); - try { - this.logRollRunning = true; - if (closed) { - LOG.debug("HLog closed. Skipping rolling of writer"); - return regionsToFlush; + synchronized (rollWriterLock) { + // Return if nothing to flush. + if (!force && this.writer != null && this.numEntries.get() <= 0) { + return null; } - // Do all the preparation outside of the updateLock to block - // as less as possible the incoming writes - long currentFilenum = this.filenum; - Path oldPath = null; - if (currentFilenum > 0) { - //computeFilename will take care of meta hlog filename - oldPath = computeFilename(currentFilenum); - } - this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); - - // Tell our listeners that a new log is about to be created - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogRoll(oldPath, newPath); - } - } - FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - // Can we get at the dfsclient outputstream? If an instance of - // SFLW, it'll have done the necessary reflection to get at the - // protected field name. - FSDataOutputStream nextHdfsOut = null; - if (nextWriter instanceof SequenceFileLogWriter) { - nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); - } - - synchronized (updateLock) { - // Clean up current writer. - Path oldFile = cleanupCurrentWriter(currentFilenum); - this.writer = nextWriter; - this.hdfs_out = nextHdfsOut; - - LOG.info((oldFile != null? - "Roll " + FSUtils.getPath(oldFile) + ", entries=" + - this.numEntries.get() + - ", filesize=" + - this.fs.getFileStatus(oldFile).getLen() + ". ": "") + - " for " + FSUtils.getPath(newPath)); - this.numEntries.set(0); - } - // Tell our listeners that a new log was created - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogRoll(oldPath, newPath); - } - } - - // Can we delete any of the old log files? - if (this.outputfiles.size() > 0) { - if (this.lastSeqWritten.isEmpty()) { - LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were - // flushed (and removed from the lastSeqWritten map). Means can - // remove all but currently open log file. - for (Map.Entry e : this.outputfiles.entrySet()) { - archiveLogFile(e.getValue(), e.getKey()); - } - this.outputfiles.clear(); - } else { - regionsToFlush = cleanOldLogs(); - } - } - } finally { + byte [][] regionsToFlush = null; try { - this.logRollRunning = false; + this.logRollRunning = true; + boolean isClosed = closed; + if (isClosed || !closeBarrier.beginOp()) { + LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer"); + return regionsToFlush; + } + // Do all the preparation outside of the updateLock to block + // as less as possible the incoming writes + long currentFilenum = this.filenum; + Path oldPath = null; + if (currentFilenum > 0) { + //computeFilename will take care of meta hlog filename + oldPath = computeFilename(currentFilenum); + } + this.filenum = System.currentTimeMillis(); + Path newPath = computeFilename(); + + // Tell our listeners that a new log is about to be created + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogRoll(oldPath, newPath); + } + } + FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); + // Can we get at the dfsclient outputstream? If an instance of + // SFLW, it'll have done the necessary reflection to get at the + // protected field name. + FSDataOutputStream nextHdfsOut = null; + if (nextWriter instanceof SequenceFileLogWriter) { + nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); + } + + Path oldFile = null; + int oldNumEntries = 0; + synchronized (updateLock) { + // Clean up current writer. + oldNumEntries = this.numEntries.get(); + oldFile = cleanupCurrentWriter(currentFilenum); + this.writer = nextWriter; + this.hdfs_out = nextHdfsOut; + this.numEntries.set(0); + } + LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile) + + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen() + : "" ) + "; new path=" + FSUtils.getPath(newPath)); + + // Tell our listeners that a new log was created + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogRoll(oldPath, newPath); + } + } + + // Can we delete any of the old log files? + if (getNumLogFiles() > 0) { + cleanOldLogs(); + regionsToFlush = getRegionsToForceFlush(); + } } finally { - this.cacheFlushLock.unlock(); + this.logRollRunning = false; + closeBarrier.endOp(); } + return regionsToFlush; } - return regionsToFlush; } /** @@ -581,36 +591,64 @@ class FSHLog implements HLog, Syncable { * encoded region names to flush. * @throws IOException */ - private byte [][] cleanOldLogs() throws IOException { - Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum(); + private void cleanOldLogs() throws IOException { + long oldestOutstandingSeqNum = Long.MAX_VALUE; + synchronized (oldestSeqNumsLock) { + Long oldestFlushing = (oldestFlushingSeqNums.size() > 0) + ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE; + Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0) + ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE; + oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed); + } + // Get the set of all log files whose last sequence number is smaller than // the oldest edit's sequence number. TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( oldestOutstandingSeqNum).keySet()); // Now remove old log files (if any) - int logsToRemove = sequenceNumbers.size(); - if (logsToRemove > 0) { - if (LOG.isDebugEnabled()) { - // Find associated region; helps debugging. - byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum); - LOG.debug("Found " + logsToRemove + " hlogs to remove" + + if (LOG.isDebugEnabled()) { + if (sequenceNumbers.size() > 0) { + LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" + " out of total " + this.outputfiles.size() + ";" + - " oldest outstanding sequenceid is " + oldestOutstandingSeqNum + - " from region " + Bytes.toStringBinary(oldestRegion)); - } - for (Long seq : sequenceNumbers) { - archiveLogFile(this.outputfiles.remove(seq), seq); + " oldest outstanding sequenceid is " + oldestOutstandingSeqNum); } } + for (Long seq : sequenceNumbers) { + archiveLogFile(this.outputfiles.remove(seq), seq); + } + } + /** + * Return regions that have edits that are equal or less than a certain sequence number. + * Static due to some old unit test. + * @param walSeqNum The sequence number to compare with. + * @param regionsToSeqNums Encoded region names to sequence ids + * @return All regions whose seqNum <= walSeqNum. Null if no regions found. + */ + static byte[][] findMemstoresWithEditsEqualOrOlderThan( + final long walSeqNum, final Map regionsToSeqNums) { + List regions = null; + for (Map.Entry e : regionsToSeqNums.entrySet()) { + if (e.getValue().longValue() <= walSeqNum) { + if (regions == null) regions = new ArrayList(); + regions.add(e.getKey()); + } + } + return regions == null ? null : regions + .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); + } + + private byte[][] getRegionsToForceFlush() throws IOException { // If too many log files, figure which regions we need to flush. // Array is an array of encoded region names. byte [][] regions = null; - int logCount = this.outputfiles.size(); + int logCount = getNumLogFiles(); if (logCount > this.maxLogs && logCount > 0) { // This is an array of encoded region names. - regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), - this.lastSeqWritten); + synchronized (oldestSeqNumsLock) { + regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), + this.oldestUnflushedSeqNums); + } if (regions != null) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < regions.length; i++) { @@ -625,29 +663,6 @@ class FSHLog implements HLog, Syncable { return regions; } - /* - * @return Logs older than this id are safe to remove. - */ - private Long getOldestOutstandingSeqNum() { - return Collections.min(this.lastSeqWritten.values()); - } - - /** - * @param oldestOutstandingSeqNum - * @return (Encoded) name of oldest outstanding region. - */ - private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) { - byte [] oldestRegion = null; - for (Map.Entry e: this.lastSeqWritten.entrySet()) { - if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) { - // Key is encoded region name. - oldestRegion = e.getKey(); - break; - } - } - return oldestRegion; - } - /* * Cleans up current writer closing and adding to outputfiles. * Presumes we're operating inside an updateLock scope. @@ -780,33 +795,39 @@ class FSHLog implements HLog, Syncable { @Override public void close() throws IOException { + if (this.closed) { + return; + } try { logSyncerThread.close(); // Make sure we synced everything logSyncerThread.join(this.optionalFlushInterval*2); } catch (InterruptedException e) { LOG.error("Exception while waiting for syncer thread to die", e); + Thread.currentThread().interrupt(); + } + try { + // Prevent all further flushing and rolling. + closeBarrier.stopAndDrainOps(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for cache flushes and log rolls", e); + Thread.currentThread().interrupt(); } - cacheFlushLock.lock(); - try { - // Tell our listeners that the log is closing - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.logCloseRequested(); - } + // Tell our listeners that the log is closing + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.logCloseRequested(); } - synchronized (updateLock) { - this.closed = true; - if (LOG.isDebugEnabled()) { - LOG.debug("closing hlog writer in " + this.dir.toString()); - } - if (this.writer != null) { - this.writer.close(); - } + } + synchronized (updateLock) { + this.closed = true; + if (LOG.isDebugEnabled()) { + LOG.debug("closing hlog writer in " + this.dir.toString()); + } + if (this.writer != null) { + this.writer.close(); } - } finally { - cacheFlushLock.unlock(); } } @@ -838,7 +859,7 @@ class FSHLog implements HLog, Syncable { // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), + this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(), Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit, htd); txid = this.unflushedEntries.incrementAndGet(); @@ -910,7 +931,7 @@ class FSHLog implements HLog, Syncable { // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum); + this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); @@ -1042,7 +1063,11 @@ class FSHLog implements HLog, Syncable { Writer tempWriter; synchronized (this.updateLock) { if (this.closed) return; - tempWriter = this.writer; // guaranteed non-null + // Guaranteed non-null. + // Note that parallel sync can close tempWriter. + // The current method of dealing with this is to catch exceptions. + // See HBASE-4387, HBASE-5623, HBASE-7329. + tempWriter = this.writer; } // if the transaction that we are interested in is already // synced, then return immediately. @@ -1078,9 +1103,11 @@ class FSHLog implements HLog, Syncable { } try { tempWriter.sync(); - } catch(IOException io) { + } catch(IOException ex) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held + // TODO: we don't actually need to do it for concurrent close - what is the point + // of syncing new unrelated writer? Keep behavior for now. tempWriter = this.writer; tempWriter.sync(); } @@ -1088,6 +1115,9 @@ class FSHLog implements HLog, Syncable { this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); + // TODO: preserving the old behavior for now, but this check is strange. It's not + // protected by any locks here, so for all we know rolling locks might start + // as soon as we enter the "if". Is this best-effort optimization check? if (!this.logRollRunning) { checkLowReplication(); try { @@ -1250,107 +1280,61 @@ class FSHLog implements HLog, Syncable { return outputfiles.size(); } - private byte[] getSnapshotName(byte[] encodedRegionName) { - byte snp[] = new byte[encodedRegionName.length + 3]; - // an encoded region name has only hex digits. s, n or p are not hex - // and therefore snapshot-names will never collide with - // encoded-region-names - snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p'; - for (int i = 0; i < encodedRegionName.length; i++) { - snp[i+3] = encodedRegionName[i]; - } - return snp; - } - @Override - public long startCacheFlush(final byte[] encodedRegionName) { - this.cacheFlushLock.lock(); - Long seq = this.lastSeqWritten.remove(encodedRegionName); - // seq is the lsn of the oldest edit associated with this region. If a - // snapshot already exists - because the last flush failed - then seq will - // be the lsn of the oldest edit in the snapshot - if (seq != null) { - // keeping the earliest sequence number of the snapshot in - // lastSeqWritten maintains the correctness of - // getOldestOutstandingSeqNum(). But it doesn't matter really because - // everything is being done inside of cacheFlush lock. - Long oldseq = - lastSeqWritten.put(getSnapshotName(encodedRegionName), seq); - if (oldseq != null) { - LOG.error("Logic Error Snapshot seq id from earlier flush still" + - " present! for region " + Bytes.toString(encodedRegionName) + - " overwritten oldseq=" + oldseq + "with new seq=" + seq); - Runtime.getRuntime().halt(1); + public Long startCacheFlush(final byte[] encodedRegionName) { + Long oldRegionSeqNum = null; + if (!closeBarrier.beginOp()) { + return null; + } + synchronized (oldestSeqNumsLock) { + oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); + if (oldRegionSeqNum != null) { + Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum); + assert oldValue == null : "Flushing map not cleaned up for " + + Bytes.toString(encodedRegionName); } } + if (oldRegionSeqNum == null) { + // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either + // the region is already flushing (which would make this call invalid), or there + // were no appends after last flush, so why are we starting flush? Maybe we should + // assert not null, and switch to "long" everywhere. Less rigorous, but safer, + // alternative is telling the caller to stop. For now preserve old logic. + LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + + Bytes.toString(encodedRegionName) + "]"); + } return obtainSeqNum(); } @Override - public void completeCacheFlush(final byte [] encodedRegionName, - final byte [] tableName, final long logSeqId, final boolean isMetaRegion) - throws IOException { - try { - if (this.closed) { - return; - } - long txid = 0; - synchronized (updateLock) { - long now = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit edit = completeCacheFlushLogEdit(); - HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, - System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); - logSyncerThread.append(new Entry(key, edit)); - txid = this.unflushedEntries.incrementAndGet(); - long took = EnvironmentEdgeManager.currentTimeMillis() - now; - long len = 0; - for (KeyValue kv : edit.getKeyValues()) { - len += kv.getLength(); - } - this.metrics.finishAppend(took, len); - this.numEntries.incrementAndGet(); - } - // sync txn to file system - this.sync(txid); - - } finally { - // updateLock not needed for removing snapshot's entry - // Cleaning up of lastSeqWritten is in the finally clause because we - // don't want to confuse getOldestOutstandingSeqNum() - this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); - this.cacheFlushLock.unlock(); + public void completeCacheFlush(final byte [] encodedRegionName) + { + synchronized (oldestSeqNumsLock) { + this.oldestFlushingSeqNums.remove(encodedRegionName); } - } - - private WALEdit completeCacheFlushLogEdit() { - KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null, - System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH); - WALEdit e = new WALEdit(); - e.add(kv); - return e; + closeBarrier.endOp(); } @Override public void abortCacheFlush(byte[] encodedRegionName) { - Long snapshot_seq = - this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); - if (snapshot_seq != null) { - // updateLock not necessary because we are racing against - // lastSeqWritten.putIfAbsent() in append() and we will always win - // before releasing cacheFlushLock make sure that the region's entry in - // lastSeqWritten points to the earliest edit in the region - Long current_memstore_earliest_seq = - this.lastSeqWritten.put(encodedRegionName, snapshot_seq); - if (current_memstore_earliest_seq != null && - (current_memstore_earliest_seq.longValue() <= - snapshot_seq.longValue())) { - LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) + - "acquired edits out of order current memstore seq=" + - current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq); - Runtime.getRuntime().halt(1); + Long currentSeqNum = null, seqNumBeforeFlushStarts = null; + synchronized (oldestSeqNumsLock) { + seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName); + if (seqNumBeforeFlushStarts != null) { + currentSeqNum = + this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts); } } - this.cacheFlushLock.unlock(); + closeBarrier.endOp(); + if ((currentSeqNum != null) + && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) { + String errorStr = "Region " + Bytes.toString(encodedRegionName) + + "acquired edits out of order current memstore seq=" + currentSeqNum + + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts; + LOG.error(errorStr); + assert false : errorStr; + Runtime.getRuntime().halt(1); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index eed3952117e..5fc031f1b34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -162,14 +162,14 @@ public interface HLog { } } - /* + /** * registers WALActionsListener * * @param listener */ public void registerWALActionsListener(final WALActionsListener listener); - /* + /** * unregisters WALActionsListener * * @param listener @@ -200,18 +200,10 @@ public interface HLog { /** * Roll the log writer. That is, start writing log messages to a new file. * - * Because a log cannot be rolled during a cache flush, and a cache flush - * spans two method calls, a special lock needs to be obtained so that a cache - * flush cannot start when the log is being rolled and the log cannot be - * rolled during a cache flush. - * *

- * Note that this method cannot be synchronized because it is possible that - * startCacheFlush runs, obtaining the cacheFlushLock, then this method could - * start which would obtain the lock on this but block on obtaining the - * cacheFlushLock and then completeCacheFlush could be called which would wait - * for the lock on this and consequently never release the cacheFlushLock - * + * The implementation is synchronized in order to make sure there's one rollWriter + * running at any given time. + * * @return If lots of logs, flush the returned regions so next time through we * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link HRegionInfo#getEncodedName()} @@ -223,17 +215,9 @@ public interface HLog { /** * Roll the log writer. That is, start writing log messages to a new file. * - * Because a log cannot be rolled during a cache flush, and a cache flush - * spans two method calls, a special lock needs to be obtained so that a cache - * flush cannot start when the log is being rolled and the log cannot be - * rolled during a cache flush. - * *

- * Note that this method cannot be synchronized because it is possible that - * startCacheFlush runs, obtaining the cacheFlushLock, then this method could - * start which would obtain the lock on this but block on obtaining the - * cacheFlushLock and then completeCacheFlush could be called which would wait - * for the lock on this and consequently never release the cacheFlushLock + * The implementation is synchronized in order to make sure there's one rollWriter + * running at any given time. * * @param force * If true, force creation of a new writer even if no entries have @@ -337,53 +321,33 @@ public interface HLog { public long obtainSeqNum(); /** - * By acquiring a log sequence ID, we can allow log messages to continue while - * we flush the cache. - * - * Acquire a lock so that we do not roll the log between the start and - * completion of a cache-flush. Otherwise the log-seq-id for the flush will - * not appear in the correct logfile. - * - * Ensuring that flushes and log-rolls don't happen concurrently also allows - * us to temporarily put a log-seq-number in lastSeqWritten against the region - * being flushed that might not be the earliest in-memory log-seq-number for - * that region. By the time the flush is completed or aborted and before the - * cacheFlushLock is released it is ensured that lastSeqWritten again has the - * oldest in-memory edit's lsn for the region that was being flushed. - * - * In this method, by removing the entry in lastSeqWritten for the region - * being flushed we ensure that the next edit inserted in this region will be - * correctly recorded in - * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The - * lsn of the earliest in-memory lsn - which is now in the memstore snapshot - - * is saved temporarily in the lastSeqWritten map while the flush is active. - * - * @return sequence ID to pass - * {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[], - * byte[], long)} - * @see #completeCacheFlush(byte[], byte[], long, boolean) - * @see #abortCacheFlush(byte[]) + * WAL keeps track of the sequence numbers that were not yet flushed from memstores + * in order to be able to do cleanup. This method tells WAL that some region is about + * to flush memstore. + * + * We stash the oldest seqNum for the region, and let the the next edit inserted in this + * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} + * as new oldest seqnum. In case of flush being aborted, we put the stashed value back; + * in case of flush succeeding, the seqNum of that first edit after start becomes the + * valid oldest seqNum for this region. + * + * @return current seqNum, to pass on to flushers (who will put it into the metadata of + * the resulting file as an upper-bound seqNum for that file), or NULL if flush + * should not be started. */ - public long startCacheFlush(final byte[] encodedRegionName); + public Long startCacheFlush(final byte[] encodedRegionName); /** - * Complete the cache flush - * - * Protected by cacheFlushLock - * - * @param encodedRegionName - * @param tableName - * @param logSeqId - * @throws IOException + * Complete the cache flush. + * @param encodedRegionName Encoded region name. */ - public void completeCacheFlush(final byte[] encodedRegionName, - final byte[] tableName, final long logSeqId, final boolean isMetaRegion) - throws IOException; + public void completeCacheFlush(final byte[] encodedRegionName); /** * Abort a cache flush. Call if the flush fails. Note that the only recovery * for an aborted flush currently is a restart of the regionserver so the - * snapshot content dropped by the failure gets restored to the memstore. + * snapshot content dropped by the failure gets restored to the memstore.v + * @param encodedRegionName Encoded region name. */ public void abortCacheFlush(byte[] encodedRegionName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index e5eb0d91120..4bb546f3675 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.util.Bytes; public class HLogUtil { static final Log LOG = LogFactory.getLog(HLogUtil.class); - static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH"); - /** * @param family * @return true if the column is a meta column @@ -243,32 +241,6 @@ public class HLogUtil { return ServerName.parseServerName(serverName); } - /** - * Return regions (memstores) that have edits that are equal or less than the - * passed oldestWALseqid. - * - * @param oldestWALseqid - * @param regionsToSeqids - * Encoded region names to sequence ids - * @return All regions whose seqid is < than oldestWALseqid (Not - * necessarily in order). Null if no regions found. - */ - static byte[][] findMemstoresWithEditsEqualOrOlderThan( - final long oldestWALseqid, final Map regionsToSeqids) { - // This method is static so it can be unit tested the easier. - List regions = null; - for (Map.Entry e : regionsToSeqids.entrySet()) { - if (e.getValue().longValue() <= oldestWALseqid) { - if (regions == null) - regions = new ArrayList(); - // Key is encoded region name. - regions.add(e.getKey()); - } - } - return regions == null ? null : regions - .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); - } - /** * Returns sorted set of edit files made by wal-log splitter, excluding files * with '.temp' suffix. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index c8ece12ada0..88f0ebf393f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -244,7 +244,12 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void sync() throws IOException { - this.writer.syncFs(); + try { + this.writer.syncFs(); + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 7006395c898..d85c1d85a65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -323,11 +323,11 @@ public class TestHLog { regionsToSeqids.put(l.toString().getBytes(), l); } byte [][] regions = - HLogUtil.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids); + FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids); assertEquals(2, regions.length); assertTrue(Bytes.equals(regions[0], "0".getBytes()) || Bytes.equals(regions[0], "1".getBytes())); - regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids); + regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids); int count = 4; assertEquals(count, regions.length); // Regions returned are not ordered. @@ -518,9 +518,8 @@ public class TestHLog { htd.addFamily(new HColumnDescriptor("column")); log.append(info, tableName, cols, System.currentTimeMillis(), htd); - long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes()); - log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, - info.isMetaRegion()); + log.startCacheFlush(info.getEncodedNameAsBytes()); + log.completeCacheFlush(info.getEncodedNameAsBytes()); log.close(); Path filename = ((FSHLog) log).computeFilename(); log = null; @@ -540,20 +539,6 @@ public class TestHLog { assertEquals((byte)(i + '0'), kv.getValue()[0]); System.out.println(key + " " + val); } - HLog.Entry entry = null; - while ((entry = reader.next(null)) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); - // Assert only one more row... the meta flushed row. - assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); - assertTrue(Bytes.equals(tableName, key.getTablename())); - KeyValue kv = val.getKeyValues().get(0); - assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); - assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH, - val.getKeyValues().get(0).getValue())); - System.out.println(key + " " + val); - } } finally { if (log != null) { log.closeAndDelete(); @@ -589,8 +574,8 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); log.append(hri, tableName, cols, System.currentTimeMillis(), htd); - long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes()); - log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false); + log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.close(); Path filename = ((FSHLog) log).computeFilename(); log = null; @@ -608,20 +593,6 @@ public class TestHLog { System.out.println(entry.getKey() + " " + val); idx++; } - - // Get next row... the meta flushed row. - entry = reader.next(); - assertEquals(1, entry.getEdit().size()); - for (KeyValue val : entry.getEdit().getKeyValues()) { - assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), - entry.getKey().getEncodedRegionName())); - assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); - assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); - assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH, - val.getValue())); - System.out.println(entry.getKey() + " " + val); - } } finally { if (log != null) { log.closeAndDelete(); @@ -705,17 +676,19 @@ public class TestHLog { assertEquals(3, ((FSHLog) log).getNumLogFiles()); // Flush the first region, we expect to see the first two files getting - // archived - long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes()); - log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false); + // archived. We need to append something or writer won't be rolled. + addEdits(log, hri2, tableName2, 1); + log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(2, ((FSHLog) log).getNumLogFiles()); // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes()); - log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false); + addEdits(log, hri2, tableName2, 1); + log.startCacheFlush(hri2.getEncodedNameAsBytes()); + log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(0, ((FSHLog) log).getNumLogFiles()); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index fb72e0f2307..819721c83be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -117,7 +117,7 @@ public class TestWALActionsListener { assertEquals(11, observer.postLogRollCounter); assertEquals(5, laterobserver.preLogRollCounter); assertEquals(5, laterobserver.postLogRollCounter); - assertEquals(2, observer.closedCount); + assertEquals(1, observer.closedCount); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 8ff9e0d1349..d285b6502f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -557,8 +557,8 @@ public class TestWALReplay { } // Add a cache flush, shouldn't have any effect - long logSeqId = wal.startCacheFlush(regionName); - wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion()); + wal.startCacheFlush(regionName); + wal.completeCacheFlush(regionName); // Add an edit to another family, should be skipped. WALEdit edit = new WALEdit(); @@ -661,7 +661,7 @@ public class TestWALReplay { wal.doCompleteCacheFlush = true; // allow complete cache flush with the previous seq number got after first // set of edits. - wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false); + wal.completeCacheFlush(hri.getEncodedNameAsBytes()); wal.close(); FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, @@ -686,12 +686,11 @@ public class TestWALReplay { } @Override - public void completeCacheFlush(byte[] encodedRegionName, byte[] tableName, long logSeqId, - boolean isMetaRegion) throws IOException { + public void completeCacheFlush(byte[] encodedRegionName) { if (!doCompleteCacheFlush) { return; } - super.completeCacheFlush(encodedRegionName, tableName, logSeqId, isMetaRegion); + super.completeCacheFlush(encodedRegionName); } }