HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request of LogRoll is blocked

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1159497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-08-19 03:57:48 +00:00
parent 3910bd4b6d
commit 4c797e7ba9
3 changed files with 131 additions and 29 deletions

View File

@ -442,10 +442,14 @@ Release 0.90.5 - Unreleased
regionserver (Anirudh Todi) regionserver (Anirudh Todi)
HBASE-4196 TableRecordReader may skip first row of region (Ming Ma) HBASE-4196 TableRecordReader may skip first row of region (Ming Ma)
HBASE-4170 createTable java doc needs to be improved (Mubarak Seyed) HBASE-4170 createTable java doc needs to be improved (Mubarak Seyed)
HBASE-4144 RS does not abort if the initialization of RS fails (ramkrishna.s.vasudevan) HBASE-4144 RS does not abort if the initialization of RS fails
HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh) (ramkrishna.s.vasudevan)
HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata
(Jonathan Hsieh)
HBASE-4159 HBaseServer - IPC Reader threads are not daemons (Douglas HBASE-4159 HBaseServer - IPC Reader threads are not daemons (Douglas
Campbell) Campbell)
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's
request of LogRoll is blocked (Jieshan Bean)
IMPROVEMENT IMPROVEMENT
HBASE-4205 Enhance HTable javadoc (Eric Charles) HBASE-4205 Enhance HTable javadoc (Eric Charles)

View File

@ -29,11 +29,11 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Arrays;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
@ -55,7 +55,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -125,8 +130,7 @@ public class HLog implements Syncable {
private final long blocksize; private final long blocksize;
private final String prefix; private final String prefix;
private final Path oldLogDir; private final Path oldLogDir;
private boolean logRollRequested; private boolean logRollRunning;
private static Class<? extends Writer> logWriterClass; private static Class<? extends Writer> logWriterClass;
private static Class<? extends Reader> logReaderClass; private static Class<? extends Reader> logReaderClass;
@ -138,7 +142,9 @@ public class HLog implements Syncable {
} }
private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
private int initialReplication; // initial replication factor of SequenceFile.writer // Minimum tolerable replicas, if the actual value is lower than it,
// rollWriter will be triggered
private int minTolerableReplication;
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{}; final static Object [] NO_ARGS = new Object []{};
@ -186,6 +192,17 @@ public class HLog implements Syncable {
//number of transactions in the current Hlog. //number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0); private final AtomicInteger numEntries = new AtomicInteger(0);
// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
// down the roll frequency triggered by checkLowReplication().
private volatile int consecutiveLogRolls = 0;
private final int lowReplicationRollLimit;
// If consecutiveLogRolls is larger than lowReplicationRollLimit,
// then disable the rolling in checkLowReplication().
// Enable it if the replications recover.
private volatile boolean lowReplicationRollEnabled = true;
// If > than this size, roll the log. This is typically 0.95 times the size // If > than this size, roll the log. This is typically 0.95 times the size
// of the default Hdfs block size. // of the default Hdfs block size.
@ -353,6 +370,11 @@ public class HLog implements Syncable {
} }
} }
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
this.minTolerableReplication = conf.getInt(
"hbase.regionserver.hlog.tolerable.lowreplication",
this.fs.getDefaultReplication());
this.lowReplicationRollLimit = conf.getInt(
"hbase.regionserver.hlog.lowreplication.rolllimit", 5);
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
LOG.info("HLog configuration: blocksize=" + LOG.info("HLog configuration: blocksize=" +
StringUtils.byteDesc(this.blocksize) + StringUtils.byteDesc(this.blocksize) +
@ -481,6 +503,7 @@ public class HLog implements Syncable {
} }
byte [][] regionsToFlush = null; byte [][] regionsToFlush = null;
this.cacheFlushLock.lock(); this.cacheFlushLock.lock();
this.logRollRunning = true;
try { try {
if (closed) { if (closed) {
return regionsToFlush; return regionsToFlush;
@ -491,7 +514,6 @@ public class HLog implements Syncable {
this.filenum = System.currentTimeMillis(); this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(); Path newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of // Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the // SFLW, it'll have done the necessary reflection to get at the
// protected field name. // protected field name.
@ -510,7 +532,6 @@ public class HLog implements Syncable {
// Clean up current writer. // Clean up current writer.
Path oldFile = cleanupCurrentWriter(currentFilenum); Path oldFile = cleanupCurrentWriter(currentFilenum);
this.writer = nextWriter; this.writer = nextWriter;
this.initialReplication = nextInitialReplication;
this.hdfs_out = nextHdfsOut; this.hdfs_out = nextHdfsOut;
LOG.info((oldFile != null? LOG.info((oldFile != null?
@ -520,7 +541,6 @@ public class HLog implements Syncable {
this.fs.getFileStatus(oldFile).getLen() + ". ": "") + this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath)); "New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0); this.numEntries.set(0);
this.logRollRequested = false;
} }
// Can we delete any of the old log files? // Can we delete any of the old log files?
if (this.outputfiles.size() > 0) { if (this.outputfiles.size() > 0) {
@ -538,6 +558,7 @@ public class HLog implements Syncable {
} }
} }
} finally { } finally {
this.logRollRunning = false;
this.cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
} }
return regionsToFlush; return regionsToFlush;
@ -977,7 +998,7 @@ public class HLog implements Syncable {
synchronized (this.updateLock) { synchronized (this.updateLock) {
syncTime += System.currentTimeMillis() - now; syncTime += System.currentTimeMillis() - now;
syncOps++; syncOps++;
if (!logRollRequested) { if (!this.logRollRunning) {
checkLowReplication(); checkLowReplication();
if (this.writer.getLength() > this.logrollsize) { if (this.writer.getLength() > this.logrollsize) {
requestLogRoll(); requestLogRoll();
@ -993,18 +1014,44 @@ public class HLog implements Syncable {
} }
private void checkLowReplication() { private void checkLowReplication() {
// if the number of replicas in HDFS has fallen below the initial // if the number of replicas in HDFS has fallen below the configured
// value, then roll logs. // value, then roll logs.
try { try {
int numCurrentReplicas = getLogReplication(); int numCurrentReplicas = getLogReplication();
if (numCurrentReplicas != 0 && if (numCurrentReplicas != 0
numCurrentReplicas < this.initialReplication) { && numCurrentReplicas < this.minTolerableReplication) {
LOG.warn("HDFS pipeline error detected. " + if (this.lowReplicationRollEnabled) {
"Found " + numCurrentReplicas + " replicas but expecting " + if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
this.initialReplication + " replicas. " + LOG.warn("HDFS pipeline error detected. " + "Found "
" Requesting close of hlog."); + numCurrentReplicas + " replicas but expecting no less than "
requestLogRoll(); + this.minTolerableReplication + " replicas. "
logRollRequested = true; + " Requesting close of hlog.");
requestLogRoll();
// If rollWriter is requested, increase consecutiveLogRolls. Once it
// is larger than lowReplicationRollLimit, disable the
// LowReplication-Roller
this.consecutiveLogRolls++;
} else {
LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
+ "the total number of live datanodes is lower than the tolerable replicas.");
this.consecutiveLogRolls = 0;
this.lowReplicationRollEnabled = false;
}
}
} else if (numCurrentReplicas >= this.minTolerableReplication) {
if (!this.lowReplicationRollEnabled) {
// The new writer's log replicas is always the default value.
// So we should not enable LowReplication-Roller. If numEntries
// is lower than or equals 1, we consider it as a new writer.
if (this.numEntries.get() <= 1) {
return;
}
// Once the live datanode number and the replicas return to normal,
// enable the LowReplication-Roller.
this.lowReplicationRollEnabled = true;
LOG.info("LowReplication-Roller was enabled.");
}
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
@ -1262,6 +1309,15 @@ public class HLog implements Syncable {
return Bytes.equals(METAFAMILY, family); return Bytes.equals(METAFAMILY, family);
} }
/**
* Get LowReplication-Roller status
*
* @return lowReplicationRollEnabled
*/
public boolean isLowReplicationRollEnabled() {
return lowReplicationRollEnabled;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static Class<? extends HLogKey> getKeyClass(Configuration conf) { public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>) return (Class<? extends HLogKey>)

View File

@ -19,6 +19,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -29,23 +31,21 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -55,8 +55,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertTrue;
/** /**
* Test log deletion as logs are rolled. * Test log deletion as logs are rolled.
*/ */
@ -137,6 +135,10 @@ public class TestLogRolling {
// the namenode might still try to choose the recently-dead datanode // the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times // for a pipeline, so try to a new pipeline multiple times
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.hlog.tolerable.lowreplication", 2);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.hlog.lowreplication.rolllimit", 3);
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);
cluster = TEST_UTIL.getHBaseCluster(); cluster = TEST_UTIL.getHBaseCluster();
@ -225,6 +227,30 @@ public class TestLogRolling {
} }
} }
void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
throws IOException {
for (int i = 0; i < 10; i++) {
Put put = new Put(Bytes.toBytes("row"
+ String.format("%1$04d", (start + i))));
put.add(HConstants.CATALOG_FAMILY, null, value);
table.put(put);
}
long startTime = System.currentTimeMillis();
long remaining = timeout;
while (remaining > 0) {
if (log.isLowReplicationRollEnabled() == expect) {
break;
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// continue
}
remaining = timeout - (System.currentTimeMillis() - startTime);
}
}
}
/** /**
* Give me the HDFS pipeline for this log file * Give me the HDFS pipeline for this log file
*/ */
@ -320,7 +346,23 @@ public class TestLogRolling {
// write some more log data (this should use a new hdfs_out) // write some more log data (this should use a new hdfs_out)
writeData(table, 3); writeData(table, 3);
assertTrue("The log should not roll again.", log.getFilenum() == newFilenum); assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
assertTrue("New log file should have the default replication", log // kill another datanode in the pipeline, so the replicas will be lower than
.getLogReplication() == fs.getDefaultReplication()); // the configured value 2.
assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
Thread.sleep(10000);
batchWriteAndWait(table, 3, false, 10000);
assertTrue("LowReplication Roller should've been disabled",
!log.isLowReplicationRollEnabled());
dfsCluster
.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
dfsCluster.waitActive();
// Force roll writer. The new log file will have the default replications,
// and the LowReplication Roller will be enabled.
log.rollWriter();
batchWriteAndWait(table, 13, true, 10000);
assertTrue("LowReplication Roller should've been enabled",
log.isLowReplicationRollEnabled());
assertTrue("New log file should have the default replication",
log.getLogReplication() == fs.getDefaultReplication());
} }
} }