HDFS-16661. Improve Code With Lambda in AsyncLoggerSet class

This commit is contained in:
zengqiang.xu 2022-07-14 14:54:14 +08:00
parent f1bd4e117e
commit 8f38839e61
2 changed files with 45 additions and 140 deletions

View File

@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@ -47,8 +45,6 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Listenable
* {@link QuorumCall} instances.
*/
class AsyncLoggerSet {
static final Logger LOG = LoggerFactory.getLogger(AsyncLoggerSet.class);
private final List<AsyncLogger> loggers;
private static final long INVALID_EPOCH = -1;
@ -62,9 +58,7 @@ class AsyncLoggerSet {
Preconditions.checkState(!isEpochEstablished(),
"Epoch already established: epoch=%s", myEpoch);
myEpoch = e;
for (AsyncLogger l : loggers) {
l.setEpoch(e);
}
loggers.forEach(l -> l.setEpoch(e));
}
/**
@ -73,9 +67,7 @@ class AsyncLoggerSet {
* for extra sanity checks against the protocol. See HDFS-3863.
*/
public void setCommittedTxId(long txid) {
for (AsyncLogger logger : loggers) {
logger.setCommittedTxId(txid);
}
loggers.forEach(l -> l.setCommittedTxId(txid));
}
/**
@ -87,7 +79,7 @@ class AsyncLoggerSet {
/**
* @return the epoch number for this writer. This may only be called after
* a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.
* a successful call to {@link QuorumJournalManager#createNewUniqueEpoch()}.
*/
long getEpoch() {
Preconditions.checkState(myEpoch != INVALID_EPOCH,
@ -96,18 +88,14 @@ class AsyncLoggerSet {
}
/**
* Close all of the underlying loggers.
* Close all the underlying loggers.
*/
void close() {
for (AsyncLogger logger : loggers) {
logger.close();
}
loggers.forEach(AsyncLogger::close);
}
void purgeLogsOlderThan(long minTxIdToKeep) {
for (AsyncLogger logger : loggers) {
logger.purgeLogsOlderThan(minTxIdToKeep);
}
loggers.forEach(l -> l.purgeLogsOlderThan(minTxIdToKeep));
}
@ -127,8 +115,8 @@ class AsyncLoggerSet {
try {
q.waitFor(
loggers.size(), // either all respond
majority, // or we get a majority successes
majority, // or we get a majority failures,
majority, // or we get majority successes
majority, // or we get majority failures,
timeoutMs, operationName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -206,197 +194,114 @@ class AsyncLoggerSet {
///////////////////////////////////////////////////////////////////////////
public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
calls.put(logger, logger.getJournalState());
}
Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.getJournalState()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Boolean> isFormatted() {
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
calls.put(logger, logger.isFormatted());
}
loggers.forEach(l -> calls.put(l, l.isFormatted()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
NamespaceInfo nsInfo,
long epoch) {
Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
calls.put(logger, logger.newEpoch(epoch));
}
public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(long epoch) {
Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.newEpoch(epoch)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> startLogSegment(
long txid, int layoutVersion) {
public QuorumCall<AsyncLogger, Void> startLogSegment(long txid, int layoutVersion) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
calls.put(logger, logger.startLogSegment(txid, layoutVersion));
}
loggers.forEach(l -> calls.put(l, l.startLogSegment(txid, layoutVersion)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> finalizeLogSegment(long firstTxId,
long lastTxId) {
public QuorumCall<AsyncLogger, Void> finalizeLogSegment(long firstTxId, long lastTxId) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
calls.put(logger, logger.finalizeLogSegment(firstTxId, lastTxId));
}
loggers.forEach(l -> calls.put(l, l.finalizeLogSegment(firstTxId, lastTxId)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> sendEdits(
long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
calls.put(logger, future);
}
loggers.forEach(l -> calls.put(l, l.sendEdits(segmentTxId, firstTxnId, numTxns, data)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, GetJournaledEditsResponseProto>
getJournaledEdits(long fromTxnId, int maxTransactions) {
Map<AsyncLogger,
ListenableFuture<GetJournaledEditsResponseProto>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<GetJournaledEditsResponseProto> future =
logger.getJournaledEdits(fromTxnId, maxTransactions);
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<GetJournaledEditsResponseProto>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.getJournaledEdits(fromTxnId, maxTransactions)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
long fromTxnId, boolean inProgressOk) {
Map<AsyncLogger,
ListenableFuture<RemoteEditLogManifest>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<RemoteEditLogManifest> future =
logger.getEditLogManifest(fromTxnId, inProgressOk);
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<RemoteEditLogManifest>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.getEditLogManifest(fromTxnId, inProgressOk)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, PrepareRecoveryResponseProto>
prepareRecovery(long segmentTxId) {
Map<AsyncLogger,
ListenableFuture<PrepareRecoveryResponseProto>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<PrepareRecoveryResponseProto> future =
logger.prepareRecovery(segmentTxId);
calls.put(logger, future);
}
QuorumCall<AsyncLogger, PrepareRecoveryResponseProto> prepareRecovery(long segmentTxId) {
Map<AsyncLogger, ListenableFuture<PrepareRecoveryResponseProto>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.prepareRecovery(segmentTxId)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger,Void>
acceptRecovery(SegmentStateProto log, URL fromURL) {
Map<AsyncLogger, ListenableFuture<Void>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.acceptRecovery(log, fromURL);
calls.put(logger, future);
}
QuorumCall<AsyncLogger,Void> acceptRecovery(SegmentStateProto log, URL fromURL) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.acceptRecovery(log, fromURL)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, Void> format(NamespaceInfo nsInfo, boolean force) {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.format(nsInfo, force);
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.format(nsInfo, force)));
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger, Void> doPreUpgrade() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doPreUpgrade();
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.doPreUpgrade()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doUpgrade(StorageInfo sInfo) {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doUpgrade(sInfo);
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.doUpgrade(sInfo)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doFinalize() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doFinalize();
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.doFinalize()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Boolean> canRollBack(StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) {
Map<AsyncLogger, ListenableFuture<Boolean>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Boolean> future =
logger.canRollBack(storage, prevStorage, targetLayoutVersion);
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.canRollBack(storage, prevStorage, targetLayoutVersion)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> doRollback() {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.doRollback();
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.doRollback()));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Void> discardSegments(long startTxId) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future = logger.discardSegments(startTxId);
calls.put(logger, future);
}
loggers.forEach(l -> calls.put(l, l.discardSegments(startTxId)));
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Long> getJournalCTime() {
Map<AsyncLogger, ListenableFuture<Long>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Long> future = logger.getJournalCTime();
calls.put(logger, future);
}
Map<AsyncLogger, ListenableFuture<Long>> calls = Maps.newHashMap();
loggers.forEach(l -> calls.put(l, l.getJournalCTime()));
return QuorumCall.create(calls);
}
}

View File

@ -241,7 +241,7 @@ public class QuorumJournalManager implements JournalManager {
long myEpoch = maxPromised + 1;
Map<AsyncLogger, NewEpochResponseProto> resps =
loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
loggers.waitForWriteQuorum(loggers.newEpoch(myEpoch),
newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
loggers.setEpoch(myEpoch);