From 4c797e7ba95360e0b09ae0d256fa6ae91de8f853 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 19 Aug 2011 03:57:48 +0000 Subject: [PATCH] HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's request of LogRoll is blocked git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1159497 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 8 +- .../hadoop/hbase/regionserver/wal/HLog.java | 92 +++++++++++++++---- .../regionserver/wal/TestLogRolling.java | 60 ++++++++++-- 3 files changed, 131 insertions(+), 29 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7bbea33b69a..04780033dc4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -442,10 +442,14 @@ Release 0.90.5 - Unreleased regionserver (Anirudh Todi) HBASE-4196 TableRecordReader may skip first row of region (Ming Ma) HBASE-4170 createTable java doc needs to be improved (Mubarak Seyed) - HBASE-4144 RS does not abort if the initialization of RS fails (ramkrishna.s.vasudevan) - HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh) + HBASE-4144 RS does not abort if the initialization of RS fails + (ramkrishna.s.vasudevan) + HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata + (Jonathan Hsieh) HBASE-4159 HBaseServer - IPC Reader threads are not daemons (Douglas Campbell) + HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's + request of LogRoll is blocked (Jieshan Bean) IMPROVEMENT HBASE-4205 Enhance HTable javadoc (Eric Charles) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 887f73694a6..c301d1bcfd7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -29,11 +29,11 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.Arrays; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; @@ -55,7 +55,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.Syncable; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -125,8 +130,7 @@ public class HLog implements Syncable { private final long blocksize; private final String prefix; private final Path oldLogDir; - private boolean logRollRequested; - + private boolean logRollRunning; private static Class logWriterClass; private static Class logReaderClass; @@ -138,7 +142,9 @@ public class HLog implements Syncable { } private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - private int initialReplication; // initial replication factor of SequenceFile.writer + // Minimum tolerable replicas, if the actual value is lower than it, + // rollWriter will be triggered + private int minTolerableReplication; private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; @@ -186,6 +192,17 @@ public class HLog implements Syncable { //number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); + // If live datanode count is lower than the default replicas value, + // RollWriter will be triggered in each sync(So the RollWriter will be + // triggered one by one in a short time). Using it as a workaround to slow + // down the roll frequency triggered by checkLowReplication(). + private volatile int consecutiveLogRolls = 0; + private final int lowReplicationRollLimit; + + // If consecutiveLogRolls is larger than lowReplicationRollLimit, + // then disable the rolling in checkLowReplication(). + // Enable it if the replications recover. + private volatile boolean lowReplicationRollEnabled = true; // If > than this size, roll the log. This is typically 0.95 times the size // of the default Hdfs block size. @@ -353,6 +370,11 @@ public class HLog implements Syncable { } } this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + this.minTolerableReplication = conf.getInt( + "hbase.regionserver.hlog.tolerable.lowreplication", + this.fs.getDefaultReplication()); + this.lowReplicationRollLimit = conf.getInt( + "hbase.regionserver.hlog.lowreplication.rolllimit", 5); this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); LOG.info("HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + @@ -481,6 +503,7 @@ public class HLog implements Syncable { } byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); + this.logRollRunning = true; try { if (closed) { return regionsToFlush; @@ -491,7 +514,6 @@ public class HLog implements Syncable { this.filenum = System.currentTimeMillis(); Path newPath = computeFilename(); HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); // Can we get at the dfsclient outputstream? If an instance of // SFLW, it'll have done the necessary reflection to get at the // protected field name. @@ -510,7 +532,6 @@ public class HLog implements Syncable { // Clean up current writer. Path oldFile = cleanupCurrentWriter(currentFilenum); this.writer = nextWriter; - this.initialReplication = nextInitialReplication; this.hdfs_out = nextHdfsOut; LOG.info((oldFile != null? @@ -520,7 +541,6 @@ public class HLog implements Syncable { this.fs.getFileStatus(oldFile).getLen() + ". ": "") + "New hlog " + FSUtils.getPath(newPath)); this.numEntries.set(0); - this.logRollRequested = false; } // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { @@ -538,6 +558,7 @@ public class HLog implements Syncable { } } } finally { + this.logRollRunning = false; this.cacheFlushLock.unlock(); } return regionsToFlush; @@ -977,7 +998,7 @@ public class HLog implements Syncable { synchronized (this.updateLock) { syncTime += System.currentTimeMillis() - now; syncOps++; - if (!logRollRequested) { + if (!this.logRollRunning) { checkLowReplication(); if (this.writer.getLength() > this.logrollsize) { requestLogRoll(); @@ -993,18 +1014,44 @@ public class HLog implements Syncable { } private void checkLowReplication() { - // if the number of replicas in HDFS has fallen below the initial + // if the number of replicas in HDFS has fallen below the configured // value, then roll logs. try { int numCurrentReplicas = getLogReplication(); - if (numCurrentReplicas != 0 && - numCurrentReplicas < this.initialReplication) { - LOG.warn("HDFS pipeline error detected. " + - "Found " + numCurrentReplicas + " replicas but expecting " + - this.initialReplication + " replicas. " + - " Requesting close of hlog."); - requestLogRoll(); - logRollRequested = true; + if (numCurrentReplicas != 0 + && numCurrentReplicas < this.minTolerableReplication) { + if (this.lowReplicationRollEnabled) { + if (this.consecutiveLogRolls < this.lowReplicationRollLimit) { + LOG.warn("HDFS pipeline error detected. " + "Found " + + numCurrentReplicas + " replicas but expecting no less than " + + this.minTolerableReplication + " replicas. " + + " Requesting close of hlog."); + requestLogRoll(); + // If rollWriter is requested, increase consecutiveLogRolls. Once it + // is larger than lowReplicationRollLimit, disable the + // LowReplication-Roller + this.consecutiveLogRolls++; + } else { + LOG.warn("Too many consecutive RollWriter requests, it's a sign of " + + "the total number of live datanodes is lower than the tolerable replicas."); + this.consecutiveLogRolls = 0; + this.lowReplicationRollEnabled = false; + } + } + } else if (numCurrentReplicas >= this.minTolerableReplication) { + + if (!this.lowReplicationRollEnabled) { + // The new writer's log replicas is always the default value. + // So we should not enable LowReplication-Roller. If numEntries + // is lower than or equals 1, we consider it as a new writer. + if (this.numEntries.get() <= 1) { + return; + } + // Once the live datanode number and the replicas return to normal, + // enable the LowReplication-Roller. + this.lowReplicationRollEnabled = true; + LOG.info("LowReplication-Roller was enabled."); + } } } catch (Exception e) { LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + @@ -1262,6 +1309,15 @@ public class HLog implements Syncable { return Bytes.equals(METAFAMILY, family); } + /** + * Get LowReplication-Roller status + * + * @return lowReplicationRollEnabled + */ + public boolean isLowReplicationRollEnabled() { + return lowReplicationRollEnabled; + } + @SuppressWarnings("unchecked") public static Class getKeyClass(Configuration conf) { return (Class) diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 287f1fb9213..50638961b97 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; @@ -29,23 +31,21 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.MiniDFSCluster; - import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -55,8 +55,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertTrue; - /** * Test log deletion as logs are rolled. */ @@ -137,6 +135,10 @@ public class TestLogRolling { // the namenode might still try to choose the recently-dead datanode // for a pipeline, so try to a new pipeline multiple times TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.hlog.tolerable.lowreplication", 2); + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.hlog.lowreplication.rolllimit", 3); TEST_UTIL.startMiniCluster(2); cluster = TEST_UTIL.getHBaseCluster(); @@ -225,6 +227,30 @@ public class TestLogRolling { } } + void batchWriteAndWait(HTable table, int start, boolean expect, int timeout) + throws IOException { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + + String.format("%1$04d", (start + i)))); + put.add(HConstants.CATALOG_FAMILY, null, value); + table.put(put); + } + long startTime = System.currentTimeMillis(); + long remaining = timeout; + while (remaining > 0) { + if (log.isLowReplicationRollEnabled() == expect) { + break; + } else { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // continue + } + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + } + /** * Give me the HDFS pipeline for this log file */ @@ -267,7 +293,7 @@ public class TestLogRolling { this.server = cluster.getRegionServer(0); this.log = server.getWAL(); - + // Create the test table and open it String tableName = getName(); HTableDescriptor desc = new HTableDescriptor(tableName); @@ -320,7 +346,23 @@ public class TestLogRolling { // write some more log data (this should use a new hdfs_out) writeData(table, 3); assertTrue("The log should not roll again.", log.getFilenum() == newFilenum); - assertTrue("New log file should have the default replication", log - .getLogReplication() == fs.getDefaultReplication()); + // kill another datanode in the pipeline, so the replicas will be lower than + // the configured value 2. + assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); + Thread.sleep(10000); + batchWriteAndWait(table, 3, false, 10000); + assertTrue("LowReplication Roller should've been disabled", + !log.isLowReplicationRollEnabled()); + dfsCluster + .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); + dfsCluster.waitActive(); + // Force roll writer. The new log file will have the default replications, + // and the LowReplication Roller will be enabled. + log.rollWriter(); + batchWriteAndWait(table, 13, true, 10000); + assertTrue("LowReplication Roller should've been enabled", + log.isLowReplicationRollEnabled()); + assertTrue("New log file should have the default replication", + log.getLogReplication() == fs.getDefaultReplication()); } }