HDFS-12496. Make QuorumJournalManager timeout properties configurable. Contributed by Ajay Kumar.

(cherry picked from commit b9e423fa8d30ea89244f6ec018a8064cc87d94a9)
This commit is contained in:
Arpit Agarwal 2017-09-21 08:44:43 -07:00 committed by Andrew Wang
parent f8df655f31
commit 14a05ee4c1
3 changed files with 33 additions and 21 deletions

View File

@ -727,6 +727,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.edit.log.transfer.bandwidthPerSec"; "dfs.edit.log.transfer.bandwidthPerSec";
public static final long DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT = 0; //no throttling public static final long DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT = 0; //no throttling
public static final String DFS_QJM_OPERATIONS_TIMEOUT =
"dfs.qjm.operations.timeout";
public static final long DFS_QJM_OPERATIONS_TIMEOUT_DEFAULT = 60000;
// Datanode File IO Stats // Datanode File IO Stats
public static final String DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY = public static final String DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY =
"dfs.datanode.enable.fileio.fault.injection"; "dfs.datanode.enable.fileio.fault.injection";

View File

@ -27,6 +27,7 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -76,18 +77,10 @@ public class QuorumJournalManager implements JournalManager {
private final int newEpochTimeoutMs; private final int newEpochTimeoutMs;
private final int writeTxnsTimeoutMs; private final int writeTxnsTimeoutMs;
// Since these don't occur during normal operation, we can // This timeout is used for calls that don't occur during normal operation
// use rather lengthy timeouts, and don't need to make them // e.g. format, upgrade operations and a few others. So we can use rather
// configurable. // lengthy timeouts by default.
private static final int FORMAT_TIMEOUT_MS = 60000; private final int timeoutMs;
private static final int HASDATA_TIMEOUT_MS = 60000;
private static final int CAN_ROLL_BACK_TIMEOUT_MS = 60000;
private static final int FINALIZE_TIMEOUT_MS = 60000;
private static final int PRE_UPGRADE_TIMEOUT_MS = 60000;
private static final int ROLL_BACK_TIMEOUT_MS = 60000;
private static final int DISCARD_SEGMENTS_TIMEOUT_MS = 60000;
private static final int UPGRADE_TIMEOUT_MS = 60000;
private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
private final Configuration conf; private final Configuration conf;
private final URI uri; private final URI uri;
@ -141,6 +134,10 @@ public QuorumJournalManager(Configuration conf,
this.writeTxnsTimeoutMs = conf.getInt( this.writeTxnsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
this.timeoutMs = (int) conf.getTimeDuration(DFSConfigKeys
.DFS_QJM_OPERATIONS_TIMEOUT,
DFSConfigKeys.DFS_QJM_OPERATIONS_TIMEOUT_DEFAULT, TimeUnit
.MILLISECONDS);
} }
protected List<AsyncLogger> createLoggers( protected List<AsyncLogger> createLoggers(
@ -201,7 +198,7 @@ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
public void format(NamespaceInfo nsInfo) throws IOException { public void format(NamespaceInfo nsInfo) throws IOException {
QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo); QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS, call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
"format"); "format");
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Interrupted waiting for format() response"); throw new IOException("Interrupted waiting for format() response");
@ -220,7 +217,7 @@ public boolean hasSomeData() throws IOException {
loggers.isFormatted(); loggers.isFormatted();
try { try {
call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData"); call.waitFor(loggers.size(), 0, 0, timeoutMs, "hasSomeData");
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Interrupted while determining if JNs have data"); throw new IOException("Interrupted while determining if JNs have data");
} catch (TimeoutException e) { } catch (TimeoutException e) {
@ -505,7 +502,7 @@ AsyncLoggerSet getLoggerSetForTests() {
public void doPreUpgrade() throws IOException { public void doPreUpgrade() throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade(); QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, PRE_UPGRADE_TIMEOUT_MS, call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
"doPreUpgrade"); "doPreUpgrade");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
@ -522,7 +519,7 @@ public void doPreUpgrade() throws IOException {
public void doUpgrade(Storage storage) throws IOException { public void doUpgrade(Storage storage) throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage); QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, UPGRADE_TIMEOUT_MS, call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
"doUpgrade"); "doUpgrade");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
@ -539,7 +536,7 @@ public void doUpgrade(Storage storage) throws IOException {
public void doFinalize() throws IOException { public void doFinalize() throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doFinalize(); QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, FINALIZE_TIMEOUT_MS, call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
"doFinalize"); "doFinalize");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
@ -558,7 +555,7 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage, QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
prevStorage, targetLayoutVersion); prevStorage, targetLayoutVersion);
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, CAN_ROLL_BACK_TIMEOUT_MS, call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
"lockSharedStorage"); "lockSharedStorage");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
@ -591,7 +588,7 @@ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
public void doRollback() throws IOException { public void doRollback() throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.doRollback(); QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, ROLL_BACK_TIMEOUT_MS, call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
"doRollback"); "doRollback");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
@ -609,7 +606,7 @@ public void discardSegments(long startTxId) throws IOException {
QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId); QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, call.waitFor(loggers.size(), loggers.size(), 0,
DISCARD_SEGMENTS_TIMEOUT_MS, "discardSegments"); timeoutMs, "discardSegments");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
call.rethrowException( call.rethrowException(
"Could not perform discardSegments of one or more JournalNodes"); "Could not perform discardSegments of one or more JournalNodes");
@ -628,7 +625,7 @@ public long getJournalCTime() throws IOException {
QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime(); QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
try { try {
call.waitFor(loggers.size(), loggers.size(), 0, call.waitFor(loggers.size(), loggers.size(), 0,
GET_JOURNAL_CTIME_TIMEOUT_MS, "getJournalCTime"); timeoutMs, "getJournalCTime");
if (call.countExceptions() > 0) { if (call.countExceptions() > 0) {
call.rethrowException("Could not journal CTime for one " call.rethrowException("Could not journal CTime for one "

View File

@ -4638,4 +4638,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.qjm.operations.timeout</name>
<value>60s</value>
<description>
Common key to set timeout for related operations in
QuorumJournalManager. This setting supports multiple time unit suffixes
as described in dfs.heartbeat.interval.
If no suffix is specified then milliseconds is assumed.
</description>
</property>
</configuration> </configuration>