diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b9630a94335..f62a603be37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -832,7 +832,13 @@ class FSHLog implements HLog, Syncable { public void append(HRegionInfo info, byte [] tableName, WALEdit edits, final long now, HTableDescriptor htd) throws IOException { - append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd); + append(info, tableName, edits, now, htd, true); + } + + @Override + public void append(HRegionInfo info, byte [] tableName, WALEdit edits, + final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore); } /** @@ -862,9 +868,9 @@ class FSHLog implements HLog, Syncable { * @throws IOException */ private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, - final long now, HTableDescriptor htd, boolean doSync) + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get();; + if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } @@ -879,7 +885,7 @@ class FSHLog implements HLog, Syncable { // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); - this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); + if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); @@ -903,14 +909,7 @@ class FSHLog implements HLog, Syncable { public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { - return append(info, tableName, edits, clusterId, now, htd, false); - } - - @Override - public long append(HRegionInfo info, byte [] tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) - throws IOException { - return append(info, tableName, edits, clusterId, now, htd, true); + return append(info, tableName, edits, clusterId, now, htd, false, true); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 3d814557dd1..97413b31cfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -264,17 +264,25 @@ public interface HLog { public void closeAndDelete() throws IOException; /** - * Only used in tests. - * + * Same as {@link #appendNoSync(HRegionInfo, byte[], WALEdit, UUID, long, HTableDescriptor)}, + * except it causes a sync on the log + */ + public void append(HRegionInfo info, byte[] tableName, WALEdit edits, + final long now, HTableDescriptor htd) throws IOException; + + /** + * Append a set of edits to the log. Log edits are keyed by (encoded) + * regionName, rowname, and log-sequence-id. The HLog is flushed after this + * transaction is written to the log. * @param info * @param tableName * @param edits * @param now * @param htd - * @throws IOException + * @param isInMemstore Whether the record is in memstore. False for system records. */ public void append(HRegionInfo info, byte[] tableName, WALEdit edits, - final long now, HTableDescriptor htd) throws IOException; + final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException; /** * Append a set of edits to the log. Log edits are keyed by (encoded) @@ -287,28 +295,11 @@ public interface HLog { * @param clusterId * The originating clusterId for this edit (for replication) * @param now - * @return txid of this transaction - * @throws IOException - */ - public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) throws IOException; - - /** - * Append a set of edits to the log. Log edits are keyed by (encoded) - * regionName, rowname, and log-sequence-id. The HLog is flushed after this - * transaction is written to the log. - * - * @param info - * @param tableName - * @param edits - * @param clusterId - * The originating clusterId for this edit (for replication) - * @param now * @param htd * @return txid of this transaction * @throws IOException */ - public long append(HRegionInfo info, byte[] tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException; public void hsync() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 49d047fbe82..df571855929 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -262,7 +262,7 @@ public class HLogUtil { final CompactionDescriptor c) throws IOException { WALEdit e = WALEdit.createCompaction(c); log.append(info, c.getTableName().toByteArray(), e, - EnvironmentEdgeManager.currentTimeMillis(), htd); + EnvironmentEdgeManager.currentTimeMillis(), htd, false); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 9d3fb3089d9..8d2d8836a85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -735,7 +735,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public MiniHBaseCluster startMiniCluster(final int numMasters, final int numSlaves, final String[] dataNodeHosts) throws Exception { - return startMiniCluster(numMasters, numSlaves, dataNodeHosts, null, null); + return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts, null, null); + } + + /** + * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes. + * @param numDataNodes Number of data nodes. + */ + public MiniHBaseCluster startMiniCluster(final int numMasters, + final int numSlaves, final int numDataNodes) throws Exception { + return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null); } /** @@ -766,12 +775,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, - final int numSlaves, final String[] dataNodeHosts, + final int numSlaves, final String[] dataNodeHosts, Class masterClass, + Class regionserverClass) + throws Exception { + return startMiniCluster( + numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass); + } + + /** + * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom + * number of datanodes. + * @param numDataNodes Number of data nodes. + */ + public MiniHBaseCluster startMiniCluster(final int numMasters, + final int numSlaves, int numDataNodes, final String[] dataNodeHosts, Class masterClass, Class regionserverClass) throws Exception { - int numDataNodes = numSlaves; - if ( dataNodeHosts != null && dataNodeHosts.length != 0) { + if (dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 7068153ba9b..9d547902a3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -57,9 +57,11 @@ import org.apache.hadoop.hbase.exceptions.FailedLogCloseException; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -67,6 +69,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -170,7 +173,7 @@ public class TestLogRolling { @Before public void setUp() throws Exception { - TEST_UTIL.startMiniCluster(2); + TEST_UTIL.startMiniCluster(1, 1, 2); cluster = TEST_UTIL.getHBaseCluster(); dfsCluster = TEST_UTIL.getDFSCluster(); @@ -192,18 +195,12 @@ public class TestLogRolling { this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); this.log = server.getWAL(); - // Create the test table and open it - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); - admin.createTable(desc); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + HTable table = createTestTable(this.tableName); server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); this.log = server.getWAL(); for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls - Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); - put.add(HConstants.CATALOG_FAMILY, null, value); - table.put(put); + doPut(table, i); if (i % 32 == 0) { // After every 32 writes sleep to let the log roller run try { @@ -221,7 +218,7 @@ public class TestLogRolling { * @throws org.apache.hadoop.hbase.exceptions.FailedLogCloseException */ @Test - public void testLogRolling() throws FailedLogCloseException, IOException { + public void testLogRolling() throws Exception { this.tableName = getName(); startAndWriteData(); LOG.info("after writing there are " + ((FSHLog) log).getNumLogFiles() + " log files"); @@ -248,9 +245,7 @@ public class TestLogRolling { } void writeData(HTable table, int rownum) throws IOException { - Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum))); - put.add(HConstants.CATALOG_FAMILY, null, value); - table.put(put); + doPut(table, rownum); // sleep to let the log roller run (if it needs to) try { @@ -324,12 +319,7 @@ public class TestLogRolling { /** * Tests that logs are rolled upon detecting datanode death * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200) - * @throws IOException - * @throws InterruptedException - * @throws InvocationTargetException - * @throws IllegalAccessException - * @throws IllegalArgumentException - */ + */ @Test public void testLogRollOnDatanodeDeath() throws Exception { assertTrue("This test requires HLog file replication set to 2.", @@ -587,5 +577,75 @@ public class TestLogRolling { } } + /** + * Tests that logs are deleted when some region has a compaction + * record in WAL and no other records. See HBASE-8597. + */ + @Test + public void testCompactionRecordDoesntBlockRolling() throws Exception { + // When the META table can be opened, the region servers are running + new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME); + + String tableName = getName(); + HTable table = createTestTable(tableName); + String tableName2 = tableName + "1"; + HTable table2 = createTestTable(tableName2); + + server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); + this.log = server.getWAL(); + FSHLog fshLog = (FSHLog)log; + HRegion region = server.getOnlineRegions(table2.getTableName()).get(0); + Store s = region.getStore(HConstants.CATALOG_FAMILY); + + + // Put some stuff into table2, to make sure we have some files to compact. + for (int i = 1; i <= 2; ++i) { + doPut(table2, i); + admin.flush(table2.getTableName()); + } + doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL + assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumLogFiles()); + assertEquals(2, s.getStorefilesCount()); + + // Roll the log and compact table2, to have compaction record in the 2nd WAL. + fshLog.rollWriter(); + assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumLogFiles()); + admin.flush(table2.getTableName()); + region.compactStores(); + // Wait for compaction in case if flush triggered it before us. + Assert.assertNotNull(s); + for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { + Threads.sleepWithoutInterrupt(200); + } + assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); + + // Write some value to the table so the WAL cannot be deleted until table is flushed. + doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table. + fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. + assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumLogFiles()); + + // Flush table to make latest WAL obsolete; write another record, and roll again. + admin.flush(table.getTableName()); + doPut(table, 1); + fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. + assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumLogFiles()); + + table.close(); + table2.close(); + } + + private void doPut(HTable table, int i) throws IOException { + Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); + put.add(HConstants.CATALOG_FAMILY, null, value); + table.put(put); + } + + private HTable createTestTable(String tableName) throws IOException { + // Create the test table and open it + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + return new HTable(TEST_UTIL.getConfiguration(), tableName); + } }