HDFS-16661. Modify this patch based on comment

This commit is contained in:
zengqiang.xu 2022-07-18 10:43:23 +08:00
parent 8f38839e61
commit 07c4454627
1 changed files with 34 additions and 26 deletions

View File

@ -46,12 +46,14 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Listenable
*/
class AsyncLoggerSet {
private final List<AsyncLogger> loggers;
private final int loggerSize;
private static final long INVALID_EPOCH = -1;
private long myEpoch = INVALID_EPOCH;
public AsyncLoggerSet(List<AsyncLogger> loggers) {
this.loggers = ImmutableList.copyOf(loggers);
this.loggerSize = loggers.size();
}
void setEpoch(long e) {
@ -114,7 +116,7 @@ class AsyncLoggerSet {
int majority = getMajoritySize();
try {
q.waitFor(
loggers.size(), // either all respond
this.loggerSize, // either all respond
majority, // or we get majority successes
majority, // or we get majority failures,
timeoutMs, operationName);
@ -139,21 +141,21 @@ class AsyncLoggerSet {
* @return the number of nodes which are required to obtain a quorum.
*/
int getMajoritySize() {
return loggers.size() / 2 + 1;
return this.loggerSize / 2 + 1;
}
/**
* @return a textual description of the majority size (eg "2/3" or "3/5")
*/
String getMajorityString() {
return getMajoritySize() + "/" + loggers.size();
return getMajoritySize() + "/" + this.loggerSize;
}
/**
* @return the number of loggers behind this set
*/
int size() {
return loggers.size();
return this.loggerSize;
}
@Override
@ -194,113 +196,119 @@ class AsyncLoggerSet {
///////////////////////////////////////////////////////////////////////////
public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.getJournalState()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Boolean> isFormatted() {
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Boolean>> calls =
Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.isFormatted()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(long epoch) {
Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls = Maps.newHashMap();
public QuorumCall<AsyncLogger, NewEpochResponseProto> newEpoch(long epoch) {
Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =
Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.newEpoch(epoch)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> startLogSegment(long txid, int layoutVersion) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.startLogSegment(txid, layoutVersion)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> finalizeLogSegment(long firstTxId, long lastTxId) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.finalizeLogSegment(firstTxId, lastTxId)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> sendEdits(
long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.sendEdits(segmentTxId, firstTxnId, numTxns, data)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, GetJournaledEditsResponseProto>
getJournaledEdits(long fromTxnId, int maxTransactions) {
Map<AsyncLogger, ListenableFuture<GetJournaledEditsResponseProto>> calls = Maps.newHashMap();
public QuorumCall<AsyncLogger, GetJournaledEditsResponseProto> getJournaledEdits(
long fromTxnId, int maxTransactions) {
Map<AsyncLogger, ListenableFuture<GetJournaledEditsResponseProto>> calls =
Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.getJournaledEdits(fromTxnId, maxTransactions)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
long fromTxnId, boolean inProgressOk) {
Map<AsyncLogger, ListenableFuture<RemoteEditLogManifest>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<RemoteEditLogManifest>> calls =
Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.getEditLogManifest(fromTxnId, inProgressOk)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, PrepareRecoveryResponseProto> prepareRecovery(long segmentTxId) {
Map<AsyncLogger, ListenableFuture<PrepareRecoveryResponseProto>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<PrepareRecoveryResponseProto>> calls =
Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.prepareRecovery(segmentTxId)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger,Void> acceptRecovery(SegmentStateProto log, URL fromURL) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
QuorumCall<AsyncLogger, Void> acceptRecovery(SegmentStateProto log, URL fromURL) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.acceptRecovery(log, fromURL)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, Void> format(NamespaceInfo nsInfo, boolean force) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.format(nsInfo, force)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, Void> doPreUpgrade() {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.doPreUpgrade()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doUpgrade(StorageInfo sInfo) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.doUpgrade(sInfo)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doFinalize() {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.doFinalize()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Boolean> canRollBack(StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) {
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.canRollBack(storage, prevStorage, targetLayoutVersion)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doRollback() {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.doRollback()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> discardSegments(long startTxId) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.discardSegments(startTxId)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Long> getJournalCTime() {
Map<AsyncLogger, ListenableFuture<Long>> calls = Maps.newHashMap();
Map<AsyncLogger, ListenableFuture<Long>> calls = Maps.newHashMapWithExpectedSize(loggerSize);
loggers.forEach(l -> calls.put(l, l.getJournalCTime()));
return QuorumCall.create(calls);
}