diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 79321b34682..20ae6021fc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface WAL { +public interface WAL extends AutoCloseable { /** * Registers WALActionsListener diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 9eaeda4a74e..19759d126e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -72,7 +72,7 @@ import org.junit.rules.TestName; public abstract class AbstractTestFSWAL { - private static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class); + protected static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class); protected static Configuration CONF; protected static FileSystem FS; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index bf56afeb20d..640e85127df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -23,6 +23,10 @@ import java.lang.reflect.Field; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -32,14 +36,21 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALKey; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; + /** * Provides FSHLog test cases. */ @@ -101,4 +112,98 @@ public class TestFSHLog extends AbstractTestFSWAL { log.close(); } } + + /** + * Test case for https://issues.apache.org/jira/browse/HBASE-16721 + */ + @Test (timeout = 30000) + public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { + final String name = "testSyncRunnerIndexOverflow"; + final byte[] b = Bytes.toBytes("b"); + + final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + final CountDownLatch holdAppend = new CountDownLatch(1); + final CountDownLatch flushFinished = new CountDownLatch(1); + final CountDownLatch putFinished = new CountDownLatch(1); + + try (FSHLog log = + new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, + null, true, null, null)) { + + log.registerWALActionsListener(new WALActionsListener.Base() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) + throws IOException { + if (startHoldingForAppend.get()) { + try { + holdAppend.await(); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + }); + + // open a new region which uses this WAL + HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b)); + HRegionInfo hri = + new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + + final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); + ExecutorService exec = Executors.newFixedThreadPool(2); + + // do a regular write first because of memstore size calculation. + region.put(new Put(b).addColumn(b, b,b)); + + startHoldingForAppend.set(true); + exec.submit(new Runnable() { + @Override + public void run() { + try { + region.put(new Put(b).addColumn(b, b,b)); + putFinished.countDown(); + } catch (IOException e) { + LOG.error(e); + } + } + }); + + // give the put a chance to start + Threads.sleep(3000); + + exec.submit(new Runnable() { + @Override + public void run() { + try { + Region.FlushResult flushResult = region.flush(true); + LOG.info("Flush result:" + flushResult.getResult()); + LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); + flushFinished.countDown(); + } catch (IOException e) { + LOG.error(e); + } + } + }); + + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + putFinished.await(); + flushFinished.await(); + + // check whether flush went through + assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size()); + + // now check the region's unflushed seqIds. + long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes()); + assertEquals("Found seqId for the region which is already flushed", + HConstants.NO_SEQNUM, seqId); + + region.close(); + } + } }