HDFS-4176. EditLogTailer should call rollEdits with a timeout. (Lei Xu)

This commit is contained in:
Lei Xu 2016-07-28 13:32:00 -07:00
parent 328c855a57
commit 67406460f0
4 changed files with 116 additions and 16 deletions

View File

@ -731,6 +731,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY = public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY =
"dfs.ha.tail-edits.in-progress"; "dfs.ha.tail-edits.in-progress";
public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false; public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false;
public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY =
"dfs.ha.tail-edits.rolledits.timeout";
public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 1m
public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout"; public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout";
public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";

View File

@ -27,16 +27,21 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -101,6 +106,17 @@ public class EditLogTailer {
*/ */
private final long logRollPeriodMs; private final long logRollPeriodMs;
/**
* The timeout in milliseconds of calling rollEdits RPC to Active NN.
* @see HDFS-4176.
*/
private final long rollEditsTimeoutMs;
/**
* The executor to run roll edit RPC call in a daemon thread.
*/
private final ExecutorService rollEditsRpcExecutor;
/** /**
* How often the Standby should check if there are new finalized segment(s) * How often the Standby should check if there are new finalized segment(s)
* available to be read from. * available to be read from.
@ -159,6 +175,13 @@ public class EditLogTailer {
sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
rollEditsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000;
rollEditsRpcExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).build());
maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
if (maxRetries <= 0) { if (maxRetries <= 0) {
@ -186,6 +209,7 @@ public class EditLogTailer {
} }
public void stop() throws IOException { public void stop() throws IOException {
rollEditsRpcExecutor.shutdown();
tailerThread.setShouldRun(false); tailerThread.setShouldRun(false);
tailerThread.interrupt(); tailerThread.interrupt();
try { try {
@ -300,30 +324,50 @@ public class EditLogTailer {
} }
/** /**
* Trigger the active node to roll its logs. * NameNodeProxy factory method.
* @return a Callable to roll logs on remote NameNode.
*/ */
private void triggerActiveLogRoll() { @VisibleForTesting
LOG.info("Triggering log roll on remote NameNode"); Callable<Void> getNameNodeProxy() {
try { return new MultipleNameNodeProxy<Void>() {
new MultipleNameNodeProxy<Void>() {
@Override @Override
protected Void doWork() throws IOException { protected Void doWork() throws IOException {
cachedActiveProxy.rollEditLog(); cachedActiveProxy.rollEditLog();
return null; return null;
} }
}.call(); };
}
/**
* Trigger the active node to roll its logs.
*/
@VisibleForTesting
void triggerActiveLogRoll() {
LOG.info("Triggering log roll on remote NameNode");
Future<Void> future = null;
try {
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
lastRollTriggerTxId = lastLoadedTxnId; lastRollTriggerTxId = lastLoadedTxnId;
} catch (IOException ioe) { } catch (ExecutionException e) {
if (ioe instanceof RemoteException) { Throwable cause = e.getCause();
ioe = ((RemoteException)ioe).unwrapRemoteException(); if (cause instanceof RemoteException) {
IOException ioe = ((RemoteException) cause).unwrapRemoteException();
if (ioe instanceof StandbyException) { if (ioe instanceof StandbyException) {
LOG.info("Skipping log roll. Remote node is not in Active state: " + LOG.info("Skipping log roll. Remote node is not in Active state: " +
ioe.getMessage().split("\n")[0]); ioe.getMessage().split("\n")[0]);
return; return;
} }
} }
LOG.warn("Unable to trigger a roll of the active NN", e);
LOG.warn("Unable to trigger a roll of the active NN", ioe); } catch (TimeoutException e) {
if (future != null) {
future.cancel(true);
}
LOG.warn(String.format(
"Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
} catch (InterruptedException e) {
LOG.warn("Unable to trigger a roll of the active NN", e);
} }
} }

View File

@ -1495,6 +1495,13 @@
</description> </description>
</property> </property>
<property>
<name>dfs.ha.tail-edits.rolledits.timeout</name>
<value>60</value>
<description>The timeout in seconds of calling rollEdits RPC on Active NN.
</description>
</property>
<property> <property>
<name>dfs.ha.automatic-failover.enabled</name> <name>dfs.ha.automatic-failover.enabled</name>
<value>false</value> <value>false</value>

View File

@ -17,15 +17,19 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -50,6 +54,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.mockito.Mockito;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestEditLogTailer { public class TestEditLogTailer {
@ -249,4 +254,45 @@ public class TestEditLogTailer {
} }
}, 100, 10000); }, 100, 10000);
} }
@Test(timeout=20000)
public void testRollEditTimeoutForActiveNN() throws IOException {
Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 5); // 5s
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
cluster.waitActive();
cluster.transitionToActive(0);
try {
EditLogTailer tailer = Mockito.spy(
cluster.getNamesystem(1).getEditLogTailer());
AtomicInteger flag = new AtomicInteger(0);
// Return a slow roll edit process.
when(tailer.getNameNodeProxy()).thenReturn(
new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.sleep(30000); // sleep for 30 seconds.
assertTrue(Thread.currentThread().isInterrupted());
flag.addAndGet(1);
return null;
}
}
);
tailer.triggerActiveLogRoll();
assertEquals(0, flag.get());
} finally {
cluster.shutdown();
}
}
} }