HBASE-16721 Concurrency issue in WAL unflushed seqId tracking
This commit is contained in:
parent
76396714e1
commit
bf3c928b74
|
@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public interface WAL {
|
public interface WAL extends AutoCloseable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers WALActionsListener
|
* Registers WALActionsListener
|
||||||
|
|
|
@ -72,7 +72,7 @@ import org.junit.rules.TestName;
|
||||||
|
|
||||||
public abstract class AbstractTestFSWAL {
|
public abstract class AbstractTestFSWAL {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);
|
protected static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);
|
||||||
|
|
||||||
protected static Configuration CONF;
|
protected static Configuration CONF;
|
||||||
protected static FileSystem FS;
|
protected static FileSystem FS;
|
||||||
|
|
|
@ -23,6 +23,10 @@ import java.lang.reflect.Field;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -32,14 +36,21 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides FSHLog test cases.
|
* Provides FSHLog test cases.
|
||||||
*/
|
*/
|
||||||
|
@ -101,4 +112,98 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
||||||
log.close();
|
log.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
|
||||||
|
*/
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
|
||||||
|
final String name = "testSyncRunnerIndexOverflow";
|
||||||
|
final byte[] b = Bytes.toBytes("b");
|
||||||
|
|
||||||
|
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
|
||||||
|
final CountDownLatch holdAppend = new CountDownLatch(1);
|
||||||
|
final CountDownLatch flushFinished = new CountDownLatch(1);
|
||||||
|
final CountDownLatch putFinished = new CountDownLatch(1);
|
||||||
|
|
||||||
|
try (FSHLog log =
|
||||||
|
new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF,
|
||||||
|
null, true, null, null)) {
|
||||||
|
|
||||||
|
log.registerWALActionsListener(new WALActionsListener.Base() {
|
||||||
|
@Override
|
||||||
|
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)
|
||||||
|
throws IOException {
|
||||||
|
if (startHoldingForAppend.get()) {
|
||||||
|
try {
|
||||||
|
holdAppend.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// open a new region which uses this WAL
|
||||||
|
HTableDescriptor htd =
|
||||||
|
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
|
||||||
|
HRegionInfo hri =
|
||||||
|
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
|
||||||
|
|
||||||
|
final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
|
||||||
|
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
|
// do a regular write first because of memstore size calculation.
|
||||||
|
region.put(new Put(b).addColumn(b, b,b));
|
||||||
|
|
||||||
|
startHoldingForAppend.set(true);
|
||||||
|
exec.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
region.put(new Put(b).addColumn(b, b,b));
|
||||||
|
putFinished.countDown();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// give the put a chance to start
|
||||||
|
Threads.sleep(3000);
|
||||||
|
|
||||||
|
exec.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Region.FlushResult flushResult = region.flush(true);
|
||||||
|
LOG.info("Flush result:" + flushResult.getResult());
|
||||||
|
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
|
||||||
|
flushFinished.countDown();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// give the flush a chance to start. Flush should have got the region lock, and
|
||||||
|
// should have been waiting on the mvcc complete after this.
|
||||||
|
Threads.sleep(3000);
|
||||||
|
|
||||||
|
// let the append to WAL go through now that the flush already started
|
||||||
|
holdAppend.countDown();
|
||||||
|
putFinished.await();
|
||||||
|
flushFinished.await();
|
||||||
|
|
||||||
|
// check whether flush went through
|
||||||
|
assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
|
||||||
|
|
||||||
|
// now check the region's unflushed seqIds.
|
||||||
|
long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
|
||||||
|
assertEquals("Found seqId for the region which is already flushed",
|
||||||
|
HConstants.NO_SEQNUM, seqId);
|
||||||
|
|
||||||
|
region.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue