HDFS-15323. StandbyNode fails transition to active due to insufficient transaction tailing. Contributed by Konstantin V Shvachko.
(cherry picked from commit ebb878bab9
)
This commit is contained in:
parent
e25ac17e2b
commit
8358d7c175
|
@ -73,9 +73,9 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
|
static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
|
||||||
|
|
||||||
// This config is not publicly exposed
|
// This config is not publicly exposed
|
||||||
static final String QJM_RPC_MAX_TXNS_KEY =
|
public static final String QJM_RPC_MAX_TXNS_KEY =
|
||||||
"dfs.ha.tail-edits.qjm.rpc.max-txns";
|
"dfs.ha.tail-edits.qjm.rpc.max-txns";
|
||||||
static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;
|
public static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;
|
||||||
|
|
||||||
// Maximum number of transactions to fetch at a time when using the
|
// Maximum number of transactions to fetch at a time when using the
|
||||||
// RPC edit fetch mechanism
|
// RPC edit fetch mechanism
|
||||||
|
|
|
@ -299,13 +299,23 @@ public class EditLogTailer {
|
||||||
SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
|
SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() throws Exception {
|
public Void run() throws Exception {
|
||||||
try {
|
long editsTailed = 0;
|
||||||
// It is already under the full name system lock and the checkpointer
|
// Fully tail the journal to the end
|
||||||
// thread is already stopped. No need to acqure any other lock.
|
do {
|
||||||
doTailEdits();
|
long startTime = Time.monotonicNow();
|
||||||
} catch (InterruptedException e) {
|
try {
|
||||||
throw new IOException(e);
|
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
||||||
}
|
startTime - lastLoadTimeMs);
|
||||||
|
// It is already under the name system lock and the checkpointer
|
||||||
|
// thread is already stopped. No need to acquire any other lock.
|
||||||
|
editsTailed = doTailEdits();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} finally {
|
||||||
|
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
||||||
|
Time.monotonicNow() - startTime);
|
||||||
|
}
|
||||||
|
} while(editsTailed > 0);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
|
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
|
||||||
|
import static org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager.QJM_RPC_MAX_TXNS_KEY;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -72,6 +73,8 @@ public class TestStandbyInProgressTail {
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||||
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
|
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
|
||||||
500);
|
500);
|
||||||
|
// Set very samll limit of transactions per a journal rpc call
|
||||||
|
conf.setInt(QJM_RPC_MAX_TXNS_KEY, 3);
|
||||||
HAUtil.setAllowStandbyReads(conf, true);
|
HAUtil.setAllowStandbyReads(conf, true);
|
||||||
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
|
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||||
cluster = qjmhaCluster.getDfsCluster();
|
cluster = qjmhaCluster.getDfsCluster();
|
||||||
|
@ -300,6 +303,31 @@ public class TestStandbyInProgressTail {
|
||||||
waitForFileInfo(nn1, "/test", "/test2", "/test3");
|
waitForFileInfo(nn1, "/test", "/test2", "/test3");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that Standby Node tails multiple segments while catching up
|
||||||
|
* during the transition to Active.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUndertailingWhileFailover() throws Exception {
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
cluster.waitActive(0);
|
||||||
|
|
||||||
|
String p = "/testFailoverWhileTailingWithoutCache/";
|
||||||
|
mkdirs(nn0, p + 0, p + 1, p + 2, p + 3, p + 4);
|
||||||
|
nn0.getRpcServer().rollEditLog(); // create segment 1
|
||||||
|
|
||||||
|
mkdirs(nn0, p + 5, p + 6, p + 7, p + 8, p + 9);
|
||||||
|
nn0.getRpcServer().rollEditLog(); // create segment 2
|
||||||
|
|
||||||
|
mkdirs(nn0, p + 10, p + 11, p + 12, p + 13, p + 14);
|
||||||
|
nn0.getRpcServer().rollEditLog(); // create segment 3
|
||||||
|
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
cluster.waitActive(1);
|
||||||
|
waitForFileInfo(nn1, p + 0, p + 1, p + 14);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNonUniformConfig() throws Exception {
|
public void testNonUniformConfig() throws Exception {
|
||||||
// Test case where some NNs (in this case the active NN) in the cluster
|
// Test case where some NNs (in this case the active NN) in the cluster
|
||||||
|
|
Loading…
Reference in New Issue