HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2020-08-10 09:11:31 +08:00
parent 457234c695
commit ef7b9eb36e
11 changed files with 211 additions and 104 deletions

View File

@ -48,6 +48,7 @@ public final class ImmutableByteArray {
return new ImmutableByteArray(b);
}
@Override
public String toString() {
return Bytes.toStringBinary(b);
}

View File

@ -2953,7 +2953,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we get to here, the HStores have been written.
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
}
// Record latest flush time

View File

@ -499,8 +499,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
@Override
public void completeCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
}
@Override

View File

@ -351,9 +351,36 @@ class SequenceIdAccounting {
return lowestUnflushedInRegion;
}
void completeCacheFlush(final byte[] encodedRegionName) {
void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
// This is a simple hack to avoid maxFlushedSeqId go backwards.
// The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
// to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
// still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
// then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
// less than the current maxFlushedSeqId. And if next time we only flush the family with this
// unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
// This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
// behavior in other area.
// The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
// maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
// than or equal to it have been flushed, i.e, persistent to HFile, so set
// lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
// And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
// means we have flushed all the stores so the seq id for actual data should be at least plus 1.
// And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
// lowestUnflushedSeqId - 1, so here let's plus the 1 back.
Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
synchronized (tieLock) {
this.flushingSequenceIds.remove(encodedRegionName);
Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
if (unflushed == null) {
return;
}
for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
if (e.getValue().longValue() <= maxFlushedSeqId) {
e.setValue(wrappedSeqId);
}
}
}
}

View File

@ -224,7 +224,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
public void completeCacheFlush(final byte[] encodedRegionName) {
public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
}
@Override

View File

@ -185,7 +185,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
* @see #completeCacheFlush(byte[])
* @see #completeCacheFlush(byte[], long)
* @see #abortCacheFlush(byte[])
*/
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
@ -195,10 +195,12 @@ public interface WAL extends Closeable, WALFileLengthProvider {
/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.
* @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
* less that this sequence id.
* @see #startCacheFlush(byte[], Set)
* @see #abortCacheFlush(byte[])
*/
void completeCacheFlush(final byte[] encodedRegionName);
void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);
/**
* Abort a cache flush. Call if the flush fails. Note that the only recovery

View File

@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@ -38,10 +40,12 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -67,7 +71,10 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushPolicy;
import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -82,7 +89,6 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -188,12 +194,10 @@ public abstract class AbstractTestFSWAL {
/**
* helper method to simulate region flush for a WAL.
* @param wal
* @param regionEncodedName
*/
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName);
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
}
/**
@ -350,7 +354,7 @@ public abstract class AbstractTestFSWAL {
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
wal.rollWriter();
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
assertEquals(1, wal.getNumRolledLogFiles());
// clear test data
@ -526,18 +530,10 @@ public abstract class AbstractTestFSWAL {
}
}
@Test
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
final String testName = currentTest.getMethodName();
final byte[] b = Bytes.toBytes("b");
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
final CountDownLatch holdAppend = new CountDownLatch(1);
final CountDownLatch closeFinished = new CountDownLatch(1);
final CountDownLatch putFinished = new CountDownLatch(1);
try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
CountDownLatch holdAppend) throws IOException {
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
wal.init();
wal.registerWALActionsListener(new WALActionsListener() {
@Override
@ -551,52 +547,34 @@ public abstract class AbstractTestFSWAL {
}
}
});
return wal;
}
// open a new region which uses this WAL
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
throws IOException {
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
TEST_UTIL.getConfiguration(), rsServices, null);
ExecutorService exec = Executors.newFixedThreadPool(2);
when(rsServices.getConfiguration()).thenReturn(conf);
return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
}
private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
throws InterruptedException, IOException {
// do a regular write first because of memstore size calculation.
region.put(new Put(b).addColumn(b, b, b));
region.put(put);
startHoldingForAppend.set(true);
exec.submit(new Runnable() {
@Override
public void run() {
try {
region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
putFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}
});
region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
// give the put a chance to start
Threads.sleep(3000);
exec.submit(new Runnable() {
@Override
public void run() {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
closeFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}
});
exec.submit(flushOrCloseRegion);
// give the flush a chance to start. Flush should have got the region lock, and
// should have been waiting on the mvcc complete after this.
@ -604,15 +582,113 @@ public abstract class AbstractTestFSWAL {
// let the append to WAL go through now that the flush already started
holdAppend.countDown();
putFinished.await();
closeFinished.await();
flushOrCloseFinished.await();
}
// Testcase for HBASE-23181
@Test
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
String testName = currentTest.getMethodName();
byte[] b = Bytes.toBytes("b");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
CountDownLatch holdAppend = new CountDownLatch(1);
CountDownLatch closeFinished = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(1);
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
// open a new region which uses this WAL
HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
try {
doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
closeFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}, startHoldingForAppend, closeFinished, holdAppend);
// now check the region's unflushed seqIds.
long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
seqId);
} finally {
exec.shutdownNow();
region.close();
wal.close();
}
}
private static final Set<byte[]> STORES_TO_FLUSH =
Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
// Testcase for HBASE-23157
@Test
public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
String testName = currentTest.getMethodName();
byte[] a = Bytes.toBytes("a");
byte[] b = Bytes.toBytes("b");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
CountDownLatch holdAppend = new CountDownLatch(1);
CountDownLatch flushFinished = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(2);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
FlushPolicy.class);
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
// open a new region which uses this WAL
HRegion region = createHoldingHRegion(conf, htd, wal);
try {
Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
doPutWithAsyncWAL(exec, region, put, () -> {
try {
HRegion.FlushResult flushResult = region.flush(true);
LOG.info("Flush result:" + flushResult.getResult());
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
flushFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}, startHoldingForAppend, flushFinished, holdAppend);
// get the max flushed sequence id after the first flush
long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
region.put(put);
// this time we only flush family a
STORES_TO_FLUSH.add(a);
region.flush(false);
// get the max flushed sequence id after the second flush
long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
// make sure that the maxFlushedSequenceId does not go backwards
assertTrue(
"maxFlushedSeqId1(" + maxFlushedSeqId1 +
") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
maxFlushedSeqId1 <= maxFlushedSeqId2);
} finally {
exec.shutdownNow();
region.close();
wal.close();
}
}
public static final class FlushSpecificStoresPolicy extends FlushPolicy {
@Override
public Collection<HStore> selectStoresToFlush() {
if (STORES_TO_FLUSH.isEmpty()) {
return region.getStores();
} else {
return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
}
}
}
}

View File

@ -794,7 +794,7 @@ public abstract class AbstractTestWALReplay {
// Add a cache flush, shouldn't have any effect
wal.startCacheFlush(regionName, familyNames);
wal.completeCacheFlush(regionName);
wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
@ -896,7 +896,7 @@ public abstract class AbstractTestWALReplay {
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
// set of edits.
wal.completeCacheFlush(hri.getEncodedNameAsBytes());
wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
wal.shutdown();
FileStatus[] listStatus = wal.getFiles();
assertNotNull(listStatus);
@ -1079,11 +1079,11 @@ public abstract class AbstractTestWALReplay {
}
@Override
public void completeCacheFlush(byte[] encodedRegionName) {
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
if (!doCompleteCacheFlush) {
return;
}
super.completeCacheFlush(encodedRegionName);
super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
}
}

View File

@ -55,12 +55,12 @@ public class TestSequenceIdAccounting {
Map<byte[], Long> m = new HashMap<>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME);
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
// Only one family so should return NO_SEQNUM still.
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME);
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long currentSequenceId = sequenceid;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
final Set<byte[]> otherFamily = new HashSet<>(1);
@ -68,7 +68,7 @@ public class TestSequenceIdAccounting {
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
// Should return oldest sequence id in the region.
assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
sida.completeCacheFlush(ENCODED_REGION_NAME);
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
}
@Test
@ -95,7 +95,7 @@ public class TestSequenceIdAccounting {
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertTrue(sida.areAllLower(m));
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
sida.completeCacheFlush(ENCODED_REGION_NAME);
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
m.put(ENCODED_REGION_NAME, sequenceid);
assertTrue(sida.areAllLower(m));
// Flush again but add sequenceids while we are flushing.
@ -108,7 +108,7 @@ public class TestSequenceIdAccounting {
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
// The cache flush will clear out all sequenceid accounting by region.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
sida.completeCacheFlush(ENCODED_REGION_NAME);
sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
// No new edits have gone in so no sequenceid to work with.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
// Make an edit behind all we'll put now into sida.

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -178,7 +179,7 @@ public class TestFSHLogProvider {
*/
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName);
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
}
@Test
@ -230,7 +231,7 @@ public class TestFSHLogProvider {
// archived. We need to append something or writer won't be rolled.
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.rollWriter();
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
assertEquals(2, count);
@ -240,7 +241,7 @@ public class TestFSHLogProvider {
// flush information
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.rollWriter();
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
} finally {

View File

@ -528,7 +528,7 @@ public class TestWALFactory {
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
log.sync(txid);
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(info.getEncodedNameAsBytes());
log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.shutdown();
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.
@ -584,7 +584,7 @@ public class TestWALFactory {
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.shutdown();
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.