diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 9d1bc4bc628..80b682533ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -61,6 +62,8 @@ public class LogRoller extends HasThread implements Closeable { // Period to roll log. private final long rollperiod; private final int threadWakeFrequency; + // The interval to check low replication on hlog's pipeline + private long checkLowReplicationInterval; private volatile boolean running = true; @@ -99,6 +102,8 @@ public class LogRoller extends HasThread implements Closeable { getLong("hbase.regionserver.logroll.period", 3600000); this.threadWakeFrequency = this.server.getConfiguration(). getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); + this.checkLowReplicationInterval = this.server.getConfiguration().getLong( + "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); } @Override @@ -110,10 +115,32 @@ public class LogRoller extends HasThread implements Closeable { super.interrupt(); } + /** + * we need to check low replication in period, see HBASE-18132 + */ + void checkLowReplication(long now) { + try { + for (Entry entry : walNeedsRoll.entrySet()) { + WAL wal = entry.getKey(); + boolean neeRollAlready = entry.getValue(); + if(wal instanceof FSHLog && !neeRollAlready) { + FSHLog hlog = (FSHLog)wal; + if ((now - hlog.getLastTimeCheckLowReplication()) + > this.checkLowReplicationInterval) { + hlog.checkLogRoll(); + } + } + } + } catch (Throwable e) { + LOG.warn("Failed checking low replication", e); + } + } + @Override public void run() { while (running) { long now = System.currentTimeMillis(); + checkLowReplication(now); boolean periodic = false; if (!rollLog.get()) { periodic = (now - this.lastrolltime) > this.rollperiod; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index caf07a287d4..77ac1d1ab0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -47,11 +47,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -153,6 +149,10 @@ public class FSHLog extends AbstractFSWAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); + // Last time to check low replication on hlog's pipeline + private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); + + /** * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs * using our logger instead of java native logger. @@ -629,7 +629,7 @@ public class FSHLog extends AbstractFSWAL { /** * Schedule a log roll if needed. */ - void checkLogRoll() { + public void checkLogRoll() { // Will return immediately if we are in the middle of a WAL log roll currently. if (!rollWriterLock.tryLock()) { return; @@ -650,6 +650,7 @@ public class FSHLog extends AbstractFSWAL { */ private boolean checkLowReplication() { boolean logRollNeeded = false; + this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); // if the number of replicas in HDFS has fallen below the configured // value, then roll logs. try { @@ -1186,4 +1187,12 @@ public class FSHLog extends AbstractFSWAL { } return new DatanodeInfo[0]; } + + /** + * + * @return last time on checking low replication + */ + public long getLastTimeCheckLowReplication() { + return this.lastTimeCheckLowReplication; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java new file mode 100644 index 00000000000..cf2c5d7e326 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java @@ -0,0 +1,94 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category(MediumTests.class) +public class TestWALOpenAfterDNRollingStart { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static long DataNodeRestartInterval; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Sleep time before restart next dn, we need to wait the current dn to finish start up + DataNodeRestartInterval = 15000; + // interval of checking low replication. The sleep time must smaller than DataNodeRestartInterval + // so a low replication case will be detected and the wal will be rolled + long checkLowReplicationInterval = 10000; + //don't let hdfs client to choose a new replica when dn down + TEST_UTIL.getConfiguration().setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", + false); + TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", + checkLowReplicationInterval); + TEST_UTIL.startMiniDFSCluster(3); + TEST_UTIL.startMiniCluster(1); + + } + + /** + * see HBASE-18132 + * This is a test case of failing open a wal(for replication for example) after all datanode + * restarted (rolling upgrade, for example). + * Before this patch, low replication detection is only used when syncing wal. + * But if the wal haven't had any entry whiten, it will never know all the replica of the wal + * is broken(because of dn restarting). And this wal can never be open + * @throws Exception + */ + @Test(timeout = 300000) + public void test() throws Exception { + HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); + FSHLog hlog = (FSHLog)server.getWAL(null); + Path currentFile = hlog.getCurrentFileName(); + //restart every dn to simulate a dn rolling upgrade + for(int i = 0; i < TEST_UTIL.getDFSCluster().getDataNodes().size(); i++) { + //This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from + //the dn list and then add to the tail of this list, we need to always restart the first one + //to simulate rolling upgrade of every dn. + TEST_UTIL.getDFSCluster().restartDataNode(0); + //sleep enough time so log roller can detect the pipeline break and roll log + Thread.sleep(DataNodeRestartInterval); + } + + if(!server.getFileSystem().exists(currentFile)) { + Path walRootDir = FSUtils.getWALRootDir(TEST_UTIL.getConfiguration()); + final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + currentFile = new Path(oldLogDir, currentFile.getName()); + } + //if the log is not rolled, then we can never open this wal forever. + WAL.Reader reader = WALFactory + .createReader(TEST_UTIL.getTestFileSystem(), currentFile, TEST_UTIL.getConfiguration()); + + } + + +}