HBASE-8597 compaction record (probably) can block WAL cleanup forever if region is closed without edits (Sergey)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1486212 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
83edb0f4ee
commit
6e68b91ef2
|
@ -832,7 +832,13 @@ class FSHLog implements HLog, Syncable {
|
||||||
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
|
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
|
||||||
final long now, HTableDescriptor htd)
|
final long now, HTableDescriptor htd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
|
append(info, tableName, edits, now, htd, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
|
||||||
|
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
|
||||||
|
append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -862,9 +868,9 @@ class FSHLog implements HLog, Syncable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
|
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
|
||||||
final long now, HTableDescriptor htd, boolean doSync)
|
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (edits.isEmpty()) return this.unflushedEntries.get();;
|
if (edits.isEmpty()) return this.unflushedEntries.get();
|
||||||
if (this.closed) {
|
if (this.closed) {
|
||||||
throw new IOException("Cannot append; log is closed");
|
throw new IOException("Cannot append; log is closed");
|
||||||
}
|
}
|
||||||
|
@ -879,7 +885,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
// Use encoded name. Its shorter, guaranteed unique and a subset of
|
// Use encoded name. Its shorter, guaranteed unique and a subset of
|
||||||
// actual name.
|
// actual name.
|
||||||
byte [] encodedRegionName = info.getEncodedNameAsBytes();
|
byte [] encodedRegionName = info.getEncodedNameAsBytes();
|
||||||
this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
|
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
|
||||||
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
|
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
|
||||||
doWrite(info, logKey, edits, htd);
|
doWrite(info, logKey, edits, htd);
|
||||||
this.numEntries.incrementAndGet();
|
this.numEntries.incrementAndGet();
|
||||||
|
@ -903,14 +909,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
|
public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
|
||||||
UUID clusterId, final long now, HTableDescriptor htd)
|
UUID clusterId, final long now, HTableDescriptor htd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return append(info, tableName, edits, clusterId, now, htd, false);
|
return append(info, tableName, edits, clusterId, now, htd, false, true);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
|
|
||||||
UUID clusterId, final long now, HTableDescriptor htd)
|
|
||||||
throws IOException {
|
|
||||||
return append(info, tableName, edits, clusterId, now, htd, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -264,17 +264,25 @@ public interface HLog {
|
||||||
public void closeAndDelete() throws IOException;
|
public void closeAndDelete() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only used in tests.
|
* Same as {@link #appendNoSync(HRegionInfo, byte[], WALEdit, UUID, long, HTableDescriptor)},
|
||||||
*
|
* except it causes a sync on the log
|
||||||
|
*/
|
||||||
|
public void append(HRegionInfo info, byte[] tableName, WALEdit edits,
|
||||||
|
final long now, HTableDescriptor htd) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append a set of edits to the log. Log edits are keyed by (encoded)
|
||||||
|
* regionName, rowname, and log-sequence-id. The HLog is flushed after this
|
||||||
|
* transaction is written to the log.
|
||||||
* @param info
|
* @param info
|
||||||
* @param tableName
|
* @param tableName
|
||||||
* @param edits
|
* @param edits
|
||||||
* @param now
|
* @param now
|
||||||
* @param htd
|
* @param htd
|
||||||
* @throws IOException
|
* @param isInMemstore Whether the record is in memstore. False for system records.
|
||||||
*/
|
*/
|
||||||
public void append(HRegionInfo info, byte[] tableName, WALEdit edits,
|
public void append(HRegionInfo info, byte[] tableName, WALEdit edits,
|
||||||
final long now, HTableDescriptor htd) throws IOException;
|
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append a set of edits to the log. Log edits are keyed by (encoded)
|
* Append a set of edits to the log. Log edits are keyed by (encoded)
|
||||||
|
@ -287,28 +295,11 @@ public interface HLog {
|
||||||
* @param clusterId
|
* @param clusterId
|
||||||
* The originating clusterId for this edit (for replication)
|
* The originating clusterId for this edit (for replication)
|
||||||
* @param now
|
* @param now
|
||||||
* @return txid of this transaction
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits,
|
|
||||||
UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Append a set of edits to the log. Log edits are keyed by (encoded)
|
|
||||||
* regionName, rowname, and log-sequence-id. The HLog is flushed after this
|
|
||||||
* transaction is written to the log.
|
|
||||||
*
|
|
||||||
* @param info
|
|
||||||
* @param tableName
|
|
||||||
* @param edits
|
|
||||||
* @param clusterId
|
|
||||||
* The originating clusterId for this edit (for replication)
|
|
||||||
* @param now
|
|
||||||
* @param htd
|
* @param htd
|
||||||
* @return txid of this transaction
|
* @return txid of this transaction
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public long append(HRegionInfo info, byte[] tableName, WALEdit edits,
|
public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits,
|
||||||
UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
|
UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
|
||||||
|
|
||||||
public void hsync() throws IOException;
|
public void hsync() throws IOException;
|
||||||
|
|
|
@ -262,7 +262,7 @@ public class HLogUtil {
|
||||||
final CompactionDescriptor c) throws IOException {
|
final CompactionDescriptor c) throws IOException {
|
||||||
WALEdit e = WALEdit.createCompaction(c);
|
WALEdit e = WALEdit.createCompaction(c);
|
||||||
log.append(info, c.getTableName().toByteArray(), e,
|
log.append(info, c.getTableName().toByteArray(), e,
|
||||||
EnvironmentEdgeManager.currentTimeMillis(), htd);
|
EnvironmentEdgeManager.currentTimeMillis(), htd, false);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
|
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
|
||||||
}
|
}
|
||||||
|
|
|
@ -735,7 +735,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
*/
|
*/
|
||||||
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||||
final int numSlaves, final String[] dataNodeHosts) throws Exception {
|
final int numSlaves, final String[] dataNodeHosts) throws Exception {
|
||||||
return startMiniCluster(numMasters, numSlaves, dataNodeHosts, null, null);
|
return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
|
||||||
|
* @param numDataNodes Number of data nodes.
|
||||||
|
*/
|
||||||
|
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||||
|
final int numSlaves, final int numDataNodes) throws Exception {
|
||||||
|
return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -766,11 +775,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
* @return Mini hbase cluster instance created.
|
* @return Mini hbase cluster instance created.
|
||||||
*/
|
*/
|
||||||
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||||
final int numSlaves, final String[] dataNodeHosts,
|
final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
|
||||||
|
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||||
|
throws Exception {
|
||||||
|
return startMiniCluster(
|
||||||
|
numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
|
||||||
|
* number of datanodes.
|
||||||
|
* @param numDataNodes Number of data nodes.
|
||||||
|
*/
|
||||||
|
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||||
|
final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
|
||||||
Class<? extends HMaster> masterClass,
|
Class<? extends HMaster> masterClass,
|
||||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
int numDataNodes = numSlaves;
|
|
||||||
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
|
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
|
||||||
numDataNodes = dataNodeHosts.length;
|
numDataNodes = dataNodeHosts.length;
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,9 +57,11 @@ import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -67,6 +69,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -170,7 +173,7 @@ public class TestLogRolling {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
TEST_UTIL.startMiniCluster(2);
|
TEST_UTIL.startMiniCluster(1, 1, 2);
|
||||||
|
|
||||||
cluster = TEST_UTIL.getHBaseCluster();
|
cluster = TEST_UTIL.getHBaseCluster();
|
||||||
dfsCluster = TEST_UTIL.getDFSCluster();
|
dfsCluster = TEST_UTIL.getDFSCluster();
|
||||||
|
@ -192,18 +195,12 @@ public class TestLogRolling {
|
||||||
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
|
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
|
||||||
this.log = server.getWAL();
|
this.log = server.getWAL();
|
||||||
|
|
||||||
// Create the test table and open it
|
HTable table = createTestTable(this.tableName);
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
|
||||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
|
||||||
admin.createTable(desc);
|
|
||||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
|
||||||
|
|
||||||
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||||
this.log = server.getWAL();
|
this.log = server.getWAL();
|
||||||
for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
|
for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
|
||||||
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
|
doPut(table, i);
|
||||||
put.add(HConstants.CATALOG_FAMILY, null, value);
|
|
||||||
table.put(put);
|
|
||||||
if (i % 32 == 0) {
|
if (i % 32 == 0) {
|
||||||
// After every 32 writes sleep to let the log roller run
|
// After every 32 writes sleep to let the log roller run
|
||||||
try {
|
try {
|
||||||
|
@ -221,7 +218,7 @@ public class TestLogRolling {
|
||||||
* @throws org.apache.hadoop.hbase.exceptions.FailedLogCloseException
|
* @throws org.apache.hadoop.hbase.exceptions.FailedLogCloseException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLogRolling() throws FailedLogCloseException, IOException {
|
public void testLogRolling() throws Exception {
|
||||||
this.tableName = getName();
|
this.tableName = getName();
|
||||||
startAndWriteData();
|
startAndWriteData();
|
||||||
LOG.info("after writing there are " + ((FSHLog) log).getNumLogFiles() + " log files");
|
LOG.info("after writing there are " + ((FSHLog) log).getNumLogFiles() + " log files");
|
||||||
|
@ -248,9 +245,7 @@ public class TestLogRolling {
|
||||||
}
|
}
|
||||||
|
|
||||||
void writeData(HTable table, int rownum) throws IOException {
|
void writeData(HTable table, int rownum) throws IOException {
|
||||||
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
|
doPut(table, rownum);
|
||||||
put.add(HConstants.CATALOG_FAMILY, null, value);
|
|
||||||
table.put(put);
|
|
||||||
|
|
||||||
// sleep to let the log roller run (if it needs to)
|
// sleep to let the log roller run (if it needs to)
|
||||||
try {
|
try {
|
||||||
|
@ -324,11 +319,6 @@ public class TestLogRolling {
|
||||||
/**
|
/**
|
||||||
* Tests that logs are rolled upon detecting datanode death
|
* Tests that logs are rolled upon detecting datanode death
|
||||||
* Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
|
* Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
* @throws InvocationTargetException
|
|
||||||
* @throws IllegalAccessException
|
|
||||||
* @throws IllegalArgumentException
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLogRollOnDatanodeDeath() throws Exception {
|
public void testLogRollOnDatanodeDeath() throws Exception {
|
||||||
|
@ -587,5 +577,75 @@ public class TestLogRolling {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that logs are deleted when some region has a compaction
|
||||||
|
* record in WAL and no other records. See HBASE-8597.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCompactionRecordDoesntBlockRolling() throws Exception {
|
||||||
|
// When the META table can be opened, the region servers are running
|
||||||
|
new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
|
||||||
|
|
||||||
|
String tableName = getName();
|
||||||
|
HTable table = createTestTable(tableName);
|
||||||
|
String tableName2 = tableName + "1";
|
||||||
|
HTable table2 = createTestTable(tableName2);
|
||||||
|
|
||||||
|
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||||
|
this.log = server.getWAL();
|
||||||
|
FSHLog fshLog = (FSHLog)log;
|
||||||
|
HRegion region = server.getOnlineRegions(table2.getTableName()).get(0);
|
||||||
|
Store s = region.getStore(HConstants.CATALOG_FAMILY);
|
||||||
|
|
||||||
|
|
||||||
|
// Put some stuff into table2, to make sure we have some files to compact.
|
||||||
|
for (int i = 1; i <= 2; ++i) {
|
||||||
|
doPut(table2, i);
|
||||||
|
admin.flush(table2.getTableName());
|
||||||
|
}
|
||||||
|
doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
|
||||||
|
assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumLogFiles());
|
||||||
|
assertEquals(2, s.getStorefilesCount());
|
||||||
|
|
||||||
|
// Roll the log and compact table2, to have compaction record in the 2nd WAL.
|
||||||
|
fshLog.rollWriter();
|
||||||
|
assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumLogFiles());
|
||||||
|
admin.flush(table2.getTableName());
|
||||||
|
region.compactStores();
|
||||||
|
// Wait for compaction in case if flush triggered it before us.
|
||||||
|
Assert.assertNotNull(s);
|
||||||
|
for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
|
||||||
|
Threads.sleepWithoutInterrupt(200);
|
||||||
|
}
|
||||||
|
assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
|
||||||
|
|
||||||
|
// Write some value to the table so the WAL cannot be deleted until table is flushed.
|
||||||
|
doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
|
||||||
|
fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
|
||||||
|
assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumLogFiles());
|
||||||
|
|
||||||
|
// Flush table to make latest WAL obsolete; write another record, and roll again.
|
||||||
|
admin.flush(table.getTableName());
|
||||||
|
doPut(table, 1);
|
||||||
|
fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
|
||||||
|
assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumLogFiles());
|
||||||
|
|
||||||
|
table.close();
|
||||||
|
table2.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doPut(HTable table, int i) throws IOException {
|
||||||
|
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
|
||||||
|
put.add(HConstants.CATALOG_FAMILY, null, value);
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
|
||||||
|
private HTable createTestTable(String tableName) throws IOException {
|
||||||
|
// Create the test table and open it
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||||
|
admin.createTable(desc);
|
||||||
|
return new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue