HBASE-18132 Low replication should be checked in period in case of datanode rolling upgrade (Allan Yang)

This commit is contained in:
Andrew Purtell 2017-06-06 17:15:33 -07:00
parent 9c1efc9f9d
commit e0dbafd7cc
3 changed files with 126 additions and 1 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -65,6 +66,8 @@ public class LogRoller extends HasThread {
// Period to roll log.
private final long rollperiod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
public void addWAL(final WAL wal) {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
@ -101,6 +104,8 @@ public class LogRoller extends HasThread {
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}
@Override
@ -112,10 +117,32 @@ public class LogRoller extends HasThread {
super.interrupt();
}
/**
* we need to check low replication in period, see HBASE-18132
*/
void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
boolean neeRollAlready = entry.getValue();
if(wal instanceof FSHLog && !neeRollAlready) {
FSHLog hlog = (FSHLog)wal;
if ((now - hlog.getLastTimeCheckLowReplication())
> this.checkLowReplicationInterval) {
hlog.checkLogRoll();
}
}
}
} catch (Throwable e) {
LOG.warn("Failed checking low replication", e);
}
}
@Override
public void run() {
while (!server.isStopped()) {
long now = System.currentTimeMillis();
checkLowReplication(now);
boolean periodic = false;
if (!rollLog.get()) {
periodic = (now - this.lastrolltime) > this.rollperiod;

View File

@ -349,6 +349,9 @@ public class FSHLog implements WAL {
private final AtomicInteger closeErrorCount = new AtomicInteger();
// Last time to check low replication on hlog's pipeline
private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
/**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
@ -1310,7 +1313,7 @@ public class FSHLog implements WAL {
/**
* Schedule a log roll if needed.
*/
void checkLogRoll() {
public void checkLogRoll() {
// Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) return;
boolean lowReplication;
@ -1333,6 +1336,7 @@ public class FSHLog implements WAL {
*/
private boolean checkLowReplication() {
boolean logRollNeeded = false;
this.lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
// if the number of replicas in HDFS has fallen below the configured
// value, then roll logs.
try {
@ -2059,4 +2063,12 @@ public class FSHLog implements WAL {
}
return new DatanodeInfo[0];
}
/**
*
* @return last time on checking low replication
*/
public long getLastTimeCheckLowReplication() {
return this.lastTimeCheckLowReplication;
}
}

View File

@ -0,0 +1,86 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestWALOpenAfterDNRollingStart {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static long DataNodeRestartInterval;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Sleep time before restart next dn, we need to wait the current dn to finish start up
DataNodeRestartInterval = 15000;
// interval of checking low replication. The sleep time must smaller than DataNodeRestartInterval
// so a low replication case will be detected and the wal will be rolled
long checkLowReplicationInterval = 10000;
//don't let hdfs client to choose a new replica when dn down
TEST_UTIL.getConfiguration().setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable",
false);
TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval",
checkLowReplicationInterval);
TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniCluster(1);
}
/**
* see HBASE-18132
* This is a test case of failing open a wal(for replication for example) after all datanode
* restarted (rolling upgrade, for example).
* Before this patch, low replication detection is only used when syncing wal.
* But if the wal haven't had any entry whiten, it will never know all the replica of the wal
* is broken(because of dn restarting). And this wal can never be open
* @throws Exception
*/
@Test(timeout = 300000)
public void test() throws Exception {
HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
FSHLog hlog = (FSHLog)server.getWAL(null);
Path currentFile = hlog.getCurrentFileName();
//restart every dn to simulate a dn rolling upgrade
for(int i = 0; i < TEST_UTIL.getDFSCluster().getDataNodes().size(); i++) {
//This is NOT a bug, when restart dn in miniDFSCluster, it will remove the stopped dn from
//the dn list and then add to the tail of this list, we need to always restart the first one
//to simulate rolling upgrade of every dn.
TEST_UTIL.getDFSCluster().restartDataNode(0);
//sleep enough time so log roller can detect the pipeline break and roll log
Thread.sleep(DataNodeRestartInterval);
}
//if the log is not rolled, then we can never open this wal forever.
WAL.Reader reader = WALFactory
.createReader(TEST_UTIL.getTestFileSystem(), currentFile, TEST_UTIL.getConfiguration());
}
}