HDFS-12496. Make QuorumJournalManager timeout properties configurable. Contributed by Ajay Kumar.
This commit is contained in:
parent
cf2615961a
commit
18e5f2068b
@ -667,6 +667,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
|
||||
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
|
||||
|
||||
public static final String DFS_QJM_OPERATIONS_TIMEOUT =
|
||||
"dfs.qjm.operations.timeout";
|
||||
public static final long DFS_QJM_OPERATIONS_TIMEOUT_DEFAULT = 60000;
|
||||
|
||||
// Datanode File IO Stats
|
||||
public static final String DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY =
|
||||
"dfs.datanode.enable.fileio.fault.injection";
|
||||
|
@ -28,6 +28,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -78,18 +79,10 @@ public class QuorumJournalManager implements JournalManager {
|
||||
private final int newEpochTimeoutMs;
|
||||
private final int writeTxnsTimeoutMs;
|
||||
|
||||
// Since these don't occur during normal operation, we can
|
||||
// use rather lengthy timeouts, and don't need to make them
|
||||
// configurable.
|
||||
private static final int FORMAT_TIMEOUT_MS = 60000;
|
||||
private static final int HASDATA_TIMEOUT_MS = 60000;
|
||||
private static final int CAN_ROLL_BACK_TIMEOUT_MS = 60000;
|
||||
private static final int FINALIZE_TIMEOUT_MS = 60000;
|
||||
private static final int PRE_UPGRADE_TIMEOUT_MS = 60000;
|
||||
private static final int ROLL_BACK_TIMEOUT_MS = 60000;
|
||||
private static final int UPGRADE_TIMEOUT_MS = 60000;
|
||||
private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
|
||||
private static final int DISCARD_SEGMENTS_TIMEOUT_MS = 60000;
|
||||
// This timeout is used for calls that don't occur during normal operation
|
||||
// e.g. format, upgrade operations and a few others. So we can use rather
|
||||
// lengthy timeouts by default.
|
||||
private final int timeoutMs;
|
||||
|
||||
private final Configuration conf;
|
||||
private final URI uri;
|
||||
@ -143,6 +136,10 @@ public QuorumJournalManager(Configuration conf,
|
||||
this.writeTxnsTimeoutMs = conf.getInt(
|
||||
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
|
||||
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
|
||||
this.timeoutMs = (int) conf.getTimeDuration(DFSConfigKeys
|
||||
.DFS_QJM_OPERATIONS_TIMEOUT,
|
||||
DFSConfigKeys.DFS_QJM_OPERATIONS_TIMEOUT_DEFAULT, TimeUnit
|
||||
.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected List<AsyncLogger> createLoggers(
|
||||
@ -203,7 +200,7 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
|
||||
public void format(NamespaceInfo nsInfo) throws IOException {
|
||||
QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS,
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"format");
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Interrupted waiting for format() response");
|
||||
@ -222,7 +219,7 @@ public boolean hasSomeData() throws IOException {
|
||||
loggers.isFormatted();
|
||||
|
||||
try {
|
||||
call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData");
|
||||
call.waitFor(loggers.size(), 0, 0, timeoutMs, "hasSomeData");
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Interrupted while determining if JNs have data");
|
||||
} catch (TimeoutException e) {
|
||||
@ -513,7 +510,7 @@ AsyncLoggerSet getLoggerSetForTests() {
|
||||
public void doPreUpgrade() throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, PRE_UPGRADE_TIMEOUT_MS,
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"doPreUpgrade");
|
||||
|
||||
if (call.countExceptions() > 0) {
|
||||
@ -530,7 +527,7 @@ public void doPreUpgrade() throws IOException {
|
||||
public void doUpgrade(Storage storage) throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, UPGRADE_TIMEOUT_MS,
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"doUpgrade");
|
||||
|
||||
if (call.countExceptions() > 0) {
|
||||
@ -547,7 +544,7 @@ public void doUpgrade(Storage storage) throws IOException {
|
||||
public void doFinalize() throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, FINALIZE_TIMEOUT_MS,
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"doFinalize");
|
||||
|
||||
if (call.countExceptions() > 0) {
|
||||
@ -566,7 +563,7 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
|
||||
QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
|
||||
prevStorage, targetLayoutVersion);
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, CAN_ROLL_BACK_TIMEOUT_MS,
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"lockSharedStorage");
|
||||
|
||||
if (call.countExceptions() > 0) {
|
||||
@ -599,7 +596,7 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
|
||||
public void doRollback() throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, ROLL_BACK_TIMEOUT_MS,
|
||||
call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
|
||||
"doRollback");
|
||||
|
||||
if (call.countExceptions() > 0) {
|
||||
@ -617,7 +614,7 @@ public long getJournalCTime() throws IOException {
|
||||
QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0,
|
||||
GET_JOURNAL_CTIME_TIMEOUT_MS, "getJournalCTime");
|
||||
timeoutMs, "getJournalCTime");
|
||||
|
||||
if (call.countExceptions() > 0) {
|
||||
call.rethrowException("Could not journal CTime for one "
|
||||
@ -650,7 +647,7 @@ public void discardSegments(long startTxId) throws IOException {
|
||||
QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);
|
||||
try {
|
||||
call.waitFor(loggers.size(), loggers.size(), 0,
|
||||
DISCARD_SEGMENTS_TIMEOUT_MS, "discardSegments");
|
||||
timeoutMs, "discardSegments");
|
||||
if (call.countExceptions() > 0) {
|
||||
call.rethrowException(
|
||||
"Could not perform discardSegments of one or more JournalNodes");
|
||||
|
@ -4332,4 +4332,15 @@
|
||||
Enables DFSNetworkTopology to choose nodes for placing replicas.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.qjm.operations.timeout</name>
|
||||
<value>60s</value>
|
||||
<description>
|
||||
Common key to set timeout for related operations in
|
||||
QuorumJournalManager. This setting supports multiple time unit suffixes
|
||||
as described in dfs.heartbeat.interval.
|
||||
If no suffix is specified then milliseconds is assumed.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
Loading…
x
Reference in New Issue
Block a user