HBASE-16721 Concurrency issue in WAL unflushed seqId tracking
This commit is contained in:
parent
5ac2776d23
commit
bf5a7aba5c
|
@ -2294,6 +2294,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
long trxId = 0;
|
||||
MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
|
||||
// wait for all in-progress transactions to commit to WAL before
|
||||
// we can start the flush. This prevents
|
||||
// uncommitted transactions from being written into HFiles.
|
||||
// We have to block before we start the flush, otherwise keys that
|
||||
// were removed via a rollbackMemstore could be written to Hfiles.
|
||||
mvcc.completeAndWait(writeEntry);
|
||||
// set writeEntry to null to prevent mvcc.complete from being called again inside finally
|
||||
// block
|
||||
writeEntry = null;
|
||||
try {
|
||||
try {
|
||||
if (wal != null) {
|
||||
|
@ -2372,16 +2381,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
// wait for all in-progress transactions to commit to WAL before
|
||||
// we can start the flush. This prevents
|
||||
// uncommitted transactions from being written into HFiles.
|
||||
// We have to block before we start the flush, otherwise keys that
|
||||
// were removed via a rollbackMemstore could be written to Hfiles.
|
||||
mvcc.completeAndWait(writeEntry);
|
||||
// set writeEntry to null to prevent mvcc.complete from being called again inside finally
|
||||
// block
|
||||
writeEntry = null;
|
||||
} finally {
|
||||
if (writeEntry != null) {
|
||||
// In case of failure just mark current writeEntry as complete.
|
||||
|
|
|
@ -51,7 +51,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface WAL {
|
||||
public interface WAL extends AutoCloseable {
|
||||
|
||||
/**
|
||||
* Registers WALActionsListener
|
||||
|
|
|
@ -30,6 +30,10 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|||
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
|
@ -343,8 +348,8 @@ public class TestFSHLog {
|
|||
* by slowing appends in the background ring buffer thread while in foreground we call
|
||||
* flush. The addition of the sync over HRegion in flush should fix an issue where flush was
|
||||
* returning before all of its appends had made it out to the WAL (HBASE-11109).
|
||||
* see HBASE-11109
|
||||
* @throws IOException
|
||||
* @see HBASE-11109
|
||||
*/
|
||||
@Test
|
||||
public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
|
||||
|
@ -448,4 +453,98 @@ public class TestFSHLog {
|
|||
log.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
|
||||
*/
|
||||
@Test (timeout = 30000)
|
||||
public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
|
||||
final String name = "testSyncRunnerIndexOverflow";
|
||||
final byte[] b = Bytes.toBytes("b");
|
||||
|
||||
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
|
||||
final CountDownLatch holdAppend = new CountDownLatch(1);
|
||||
final CountDownLatch flushFinished = new CountDownLatch(1);
|
||||
final CountDownLatch putFinished = new CountDownLatch(1);
|
||||
|
||||
try (FSHLog log =
|
||||
new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
|
||||
null, true, null, null)) {
|
||||
|
||||
log.registerWALActionsListener(new WALActionsListener.Base() {
|
||||
@Override
|
||||
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
|
||||
throws IOException {
|
||||
if (startHoldingForAppend.get()) {
|
||||
try {
|
||||
holdAppend.await();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// open a new region which uses this WAL
|
||||
HTableDescriptor htd =
|
||||
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
|
||||
HRegionInfo hri =
|
||||
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
|
||||
|
||||
final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
|
||||
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||
|
||||
// do a regular write first because of memstore size calculation.
|
||||
region.put(new Put(b).addColumn(b, b,b));
|
||||
|
||||
startHoldingForAppend.set(true);
|
||||
exec.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
region.put(new Put(b).addColumn(b, b,b));
|
||||
putFinished.countDown();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// give the put a chance to start
|
||||
Threads.sleep(3000);
|
||||
|
||||
exec.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Region.FlushResult flushResult = region.flush(true);
|
||||
LOG.info("Flush result:" + flushResult.getResult());
|
||||
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
|
||||
flushFinished.countDown();
|
||||
} catch (IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// give the flush a chance to start. Flush should have got the region lock, and
|
||||
// should have been waiting on the mvcc complete after this.
|
||||
Threads.sleep(3000);
|
||||
|
||||
// let the append to WAL go through now that the flush already started
|
||||
holdAppend.countDown();
|
||||
putFinished.await();
|
||||
flushFinished.await();
|
||||
|
||||
// check whether flush went through
|
||||
assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
|
||||
|
||||
// now check the region's unflushed seqIds.
|
||||
long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
|
||||
assertEquals("Found seqId for the region which is already flushed",
|
||||
HConstants.NO_SEQNUM, seqId);
|
||||
|
||||
region.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue