HDFS-14655. [SBN Read] Namenode crashes if one of The JN is down. Contributed by Ayush Saxena.
This commit is contained in:
parent
f16cf877e5
commit
eb96a3093e
|
@ -1171,6 +1171,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
|
public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
|
||||||
public static final String DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_KEY = "dfs.qjournal.http.open.timeout.ms";
|
public static final String DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_KEY = "dfs.qjournal.http.open.timeout.ms";
|
||||||
public static final String DFS_QJOURNAL_HTTP_READ_TIMEOUT_KEY = "dfs.qjournal.http.read.timeout.ms";
|
public static final String DFS_QJOURNAL_HTTP_READ_TIMEOUT_KEY = "dfs.qjournal.http.read.timeout.ms";
|
||||||
|
public static final String DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_KEY =
|
||||||
|
"dfs.qjournal.parallel-read.num-threads";
|
||||||
public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
|
public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
|
||||||
public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
||||||
public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
|
||||||
|
@ -1181,6 +1183,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
|
public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
|
||||||
public static final int DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT;
|
public static final int DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT;
|
||||||
public static final int DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT;
|
public static final int DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT;
|
||||||
|
public static final int DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT = 5;
|
||||||
|
|
||||||
public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
|
public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
|
||||||
public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
|
public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.util.StopWatch;
|
import org.apache.hadoop.util.StopWatch;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -270,12 +272,14 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected ExecutorService createParallelExecutor() {
|
protected ExecutorService createParallelExecutor() {
|
||||||
return Executors.newCachedThreadPool(
|
int numThreads =
|
||||||
new ThreadFactoryBuilder()
|
conf.getInt(DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_KEY,
|
||||||
.setDaemon(true)
|
DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT);
|
||||||
|
return new HadoopThreadPoolExecutor(1, numThreads, 60L,
|
||||||
|
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
.setNameFormat("Logger channel (from parallel executor) to " + addr)
|
.setNameFormat("Logger channel (from parallel executor) to " + addr)
|
||||||
.setUncaughtExceptionHandler(
|
.setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
|
||||||
UncaughtExceptionHandlers.systemExit())
|
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.qjournal.client;
|
package org.apache.hadoop.hdfs.qjournal.client;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
@ -64,6 +66,7 @@ class QuorumCall<KEY, RESULT> {
|
||||||
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
||||||
private final StopWatch quorumStopWatch;
|
private final StopWatch quorumStopWatch;
|
||||||
private final Timer timer;
|
private final Timer timer;
|
||||||
|
private final List<ListenableFuture<RESULT>> allCalls;
|
||||||
|
|
||||||
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
||||||
Map<KEY, ? extends ListenableFuture<RESULT>> calls, Timer timer) {
|
Map<KEY, ? extends ListenableFuture<RESULT>> calls, Timer timer) {
|
||||||
|
@ -71,6 +74,7 @@ class QuorumCall<KEY, RESULT> {
|
||||||
for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
|
for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
|
||||||
Preconditions.checkArgument(e.getValue() != null,
|
Preconditions.checkArgument(e.getValue() != null,
|
||||||
"null future for key: " + e.getKey());
|
"null future for key: " + e.getKey());
|
||||||
|
qr.addCall(e.getValue());
|
||||||
Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
|
Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
|
@ -102,6 +106,11 @@ class QuorumCall<KEY, RESULT> {
|
||||||
// Only instantiated from factory method above
|
// Only instantiated from factory method above
|
||||||
this.timer = timer;
|
this.timer = timer;
|
||||||
this.quorumStopWatch = new StopWatch(timer);
|
this.quorumStopWatch = new StopWatch(timer);
|
||||||
|
this.allCalls = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addCall(ListenableFuture<RESULT> call) {
|
||||||
|
allCalls.add(call);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -211,6 +220,15 @@ class QuorumCall<KEY, RESULT> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel any outstanding calls.
|
||||||
|
*/
|
||||||
|
void cancelCalls() {
|
||||||
|
for (ListenableFuture<RESULT> call : allCalls) {
|
||||||
|
call.cancel(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if any of the responses came back with an AssertionError.
|
* Check if any of the responses came back with an AssertionError.
|
||||||
* If so, it re-throws it, even if there was a quorum of responses.
|
* If so, it re-throws it, even if there was a quorum of responses.
|
||||||
|
|
|
@ -579,6 +579,8 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
LOG.debug(msg.toString());
|
LOG.debug(msg.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Cancel any outstanding calls to JN's.
|
||||||
|
q.cancelCalls();
|
||||||
|
|
||||||
int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
|
int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
|
||||||
responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
|
responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
|
||||||
|
|
|
@ -4999,6 +4999,14 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.qjournal.parallel-read.num-threads</name>
|
||||||
|
<value>5</value>
|
||||||
|
<description>
|
||||||
|
Number of threads per JN to be used for tailing edits.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.quota.by.storage.type.enabled</name>
|
<name>dfs.quota.by.storage.type.enabled</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
|
|
|
@ -197,6 +197,10 @@ public class MiniJournalCluster {
|
||||||
return nodes[i].node;
|
return nodes[i].node;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getJournalNodeIpcAddress(int i) {
|
||||||
|
return nodes[i].ipcAddr.toString();
|
||||||
|
}
|
||||||
|
|
||||||
public void restartJournalNode(int i) throws InterruptedException, IOException {
|
public void restartJournalNode(int i) throws InterruptedException, IOException {
|
||||||
JNInfo info = nodes[i];
|
JNInfo info = nodes[i];
|
||||||
JournalNode jn = info.node;
|
JournalNode jn = info.node;
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||||
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
|
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||||
|
@ -62,7 +63,9 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.stubbing.Stubber;
|
import org.mockito.stubbing.Stubber;
|
||||||
|
|
||||||
|
@ -87,11 +90,17 @@ public class TestQuorumJournalManager {
|
||||||
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
// Don't retry connections - it just slows down the tests.
|
if (!name.getMethodName().equals("testSelectThreadCounts")) {
|
||||||
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
// Don't retry connections - it just slows down the tests.
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
}
|
||||||
// Turn off IPC client caching to handle daemon restarts.
|
// Turn off IPC client caching to handle daemon restarts.
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
||||||
|
@ -1039,6 +1048,27 @@ public class TestQuorumJournalManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectThreadCounts() throws Exception {
|
||||||
|
EditLogOutputStream stm =
|
||||||
|
qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
writeTxns(stm, 1, 10);
|
||||||
|
JournalNode jn0 = cluster.getJournalNode(0);
|
||||||
|
String ipcAddr = cluster.getJournalNodeIpcAddress(0);
|
||||||
|
jn0.stopAndJoin(0);
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
|
||||||
|
}
|
||||||
|
String expectedName =
|
||||||
|
"Logger channel (from parallel executor) to " + ipcAddr;
|
||||||
|
long num = Thread.getAllStackTraces().keySet().stream()
|
||||||
|
.filter((t) -> t.getName().contains(expectedName)).count();
|
||||||
|
// The number of threads for the stopped jn shouldn't be more than the
|
||||||
|
// configured value.
|
||||||
|
assertTrue("Number of threads are : " + num,
|
||||||
|
num <= DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSelectViaRpcTwoJNsError() throws Exception {
|
public void testSelectViaRpcTwoJNsError() throws Exception {
|
||||||
EditLogOutputStream stm =
|
EditLogOutputStream stm =
|
||||||
|
|
Loading…
Reference in New Issue