HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
@ -48,6 +48,7 @@ public final class ImmutableByteArray {
return new ImmutableByteArray(b);
public String toString() {
return Bytes.toStringBinary(b);
@ -2986,7 +2986,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we get to here, the HStores have been written.
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
// Record latest flush time
@ -517,8 +517,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
public void completeCacheFlush(byte[] encodedRegionName) {
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
@ -352,9 +352,36 @@ class SequenceIdAccounting {
return lowestUnflushedInRegion;
void completeCacheFlush(final byte[] encodedRegionName) {
void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
// This is a simple hack to avoid maxFlushedSeqId go backwards.
// The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
// to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
// still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
// then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
// less than the current maxFlushedSeqId. And if next time we only flush the family with this
// unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
// This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
// behavior in other area.
// The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
// maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
// than or equal to it have been flushed, i.e, persistent to HFile, so set
// lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
// And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
// means we have flushed all the stores so the seq id for actual data should be at least plus 1.
// And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
// lowestUnflushedSeqId - 1, so here let's plus the 1 back.
Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
synchronized (tieLock) {
Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
if (unflushed == null) {
for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
if (e.getValue().longValue() <= maxFlushedSeqId) {
@ -224,7 +224,7 @@ class DisabledWALProvider implements WALProvider {
public void completeCacheFlush(final byte[] encodedRegionName) {
public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
@ -184,7 +184,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
* @see #completeCacheFlush(byte[])
* @see #completeCacheFlush(byte[], long)
* @see #abortCacheFlush(byte[])
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
@ -194,10 +194,12 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* Complete the cache flush.
* @param encodedRegionName Encoded region name.
* @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
* less that this sequence id.
* @see #startCacheFlush(byte[], Set)
* @see #abortCacheFlush(byte[])
void completeCacheFlush(final byte[] encodedRegionName);
void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);
* Abort a cache flush. Call if the flush fails. Note that the only recovery
@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@ -38,10 +40,12 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -67,7 +71,10 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushPolicy;
import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -187,12 +194,10 @@ public abstract class AbstractTestFSWAL {
* helper method to simulate region flush for a WAL.
* @param wal
* @param regionEncodedName
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
@ -349,7 +354,7 @@ public abstract class AbstractTestFSWAL {
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
assertEquals(1, wal.getNumRolledLogFiles());
// clear test data
@ -533,18 +538,10 @@ public abstract class AbstractTestFSWAL {
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
final String testName = currentTest.getMethodName();
final byte[] b = Bytes.toBytes("b");
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
final CountDownLatch holdAppend = new CountDownLatch(1);
final CountDownLatch closeFinished = new CountDownLatch(1);
final CountDownLatch putFinished = new CountDownLatch(1);
try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
CountDownLatch holdAppend) throws IOException {
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
wal.registerWALActionsListener(new WALActionsListener() {
@ -558,52 +555,34 @@ public abstract class AbstractTestFSWAL {
return wal;
// open a new region which uses this WAL
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
throws IOException {
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
TEST_UTIL.getConfiguration(), rsServices, null);
ExecutorService exec = Executors.newFixedThreadPool(2);
return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
throws InterruptedException, IOException {
// do a regular write first because of memstore size calculation.
region.put(new Put(b).addColumn(b, b, b));
exec.submit(new Runnable() {
public void run() {
try {
region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
} catch (IOException e) {
LOG.error(e.toString(), e);
region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
// give the put a chance to start
exec.submit(new Runnable() {
public void run() {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
} catch (IOException e) {
LOG.error(e.toString(), e);
// give the flush a chance to start. Flush should have got the region lock, and
// should have been waiting on the mvcc complete after this.
@ -611,15 +590,113 @@ public abstract class AbstractTestFSWAL {
// let the append to WAL go through now that the flush already started
// Testcase for HBASE-23181
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
String testName = currentTest.getMethodName();
byte[] b = Bytes.toBytes("b");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
CountDownLatch holdAppend = new CountDownLatch(1);
CountDownLatch closeFinished = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(1);
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
// open a new region which uses this WAL
HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
try {
doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
} catch (IOException e) {
LOG.error(e.toString(), e);
}, startHoldingForAppend, closeFinished, holdAppend);
// now check the region's unflushed seqIds.
long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
} finally {
private static final Set<byte[]> STORES_TO_FLUSH =
Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
// Testcase for HBASE-23157
public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
String testName = currentTest.getMethodName();
byte[] a = Bytes.toBytes("a");
byte[] b = Bytes.toBytes("b");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
CountDownLatch holdAppend = new CountDownLatch(1);
CountDownLatch flushFinished = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(2);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
// open a new region which uses this WAL
HRegion region = createHoldingHRegion(conf, htd, wal);
try {
Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
doPutWithAsyncWAL(exec, region, put, () -> {
try {
HRegion.FlushResult flushResult = region.flush(true);
LOG.info("Flush result:" + flushResult.getResult());
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
} catch (IOException e) {
LOG.error(e.toString(), e);
}, startHoldingForAppend, flushFinished, holdAppend);
// get the max flushed sequence id after the first flush
long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
// this time we only flush family a
// get the max flushed sequence id after the second flush
long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
// make sure that the maxFlushedSequenceId does not go backwards
"maxFlushedSeqId1(" + maxFlushedSeqId1 +
") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
maxFlushedSeqId1 <= maxFlushedSeqId2);
} finally {
public static final class FlushSpecificStoresPolicy extends FlushPolicy {
public Collection<HStore> selectStoresToFlush() {
if (STORES_TO_FLUSH.isEmpty()) {
return region.getStores();
} else {
return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
@ -749,7 +749,7 @@ public abstract class AbstractTestWALReplay {
// Add a cache flush, shouldn't have any effect
wal.startCacheFlush(regionName, familyNames);
wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
@ -847,7 +847,7 @@ public abstract class AbstractTestWALReplay {
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
// set of edits.
wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
FileStatus[] listStatus = wal.getFiles();
@ -1030,11 +1030,11 @@ public abstract class AbstractTestWALReplay {
public void completeCacheFlush(byte[] encodedRegionName) {
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
if (!doCompleteCacheFlush) {
super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
@ -57,12 +57,12 @@ public class TestSequenceIdAccounting {
Map<byte[], Long> m = new HashMap<>();
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
// Only one family so should return NO_SEQNUM still.
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long currentSequenceId = sequenceid;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
final Set<byte[]> otherFamily = new HashSet<>(1);
@ -70,7 +70,7 @@ public class TestSequenceIdAccounting {
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
// Should return oldest sequence id in the region.
assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
@ -101,7 +101,7 @@ public class TestSequenceIdAccounting {
assertTrue(sida.areAllLower(m, null));
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
m.put(ENCODED_REGION_NAME, sequenceid);
assertTrue(sida.areAllLower(m, null));
// Flush again but add sequenceids while we are flushing.
@ -114,7 +114,7 @@ public class TestSequenceIdAccounting {
// The cache flush will clear out all sequenceid accounting by region.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
// No new edits have gone in so no sequenceid to work with.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
// Make an edit behind all we'll put now into sida.
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -178,7 +179,7 @@ public class TestFSHLogProvider {
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
@ -230,7 +231,7 @@ public class TestFSHLogProvider {
// archived. We need to append something or writer won't be rolled.
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
assertEquals(2, count);
@ -240,7 +241,7 @@ public class TestFSHLogProvider {
// flush information
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
} finally {
@ -528,7 +528,7 @@ public class TestWALFactory {
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.
@ -584,7 +584,7 @@ public class TestWALFactory {
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.
Reference in New Issue
Block a user