HDFS-16660. Improve Code With Lambda in IPCLoggerChannel class (#4561)

This commit is contained in:
xuzq 2022-07-28 09:53:05 +08:00 committed by GitHub
parent c92ff0b4f1
commit 24560f2eb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 130 additions and 242 deletions

View File

@ -23,7 +23,6 @@ import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -67,6 +66,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningE
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncaughtExceptionHandlers; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncaughtExceptionHandlers;
import org.apache.hadoop.util.Time;
/** /**
* Channel to a remote JournalNode using Hadoop IPC. * Channel to a remote JournalNode using Hadoop IPC.
@ -154,26 +154,15 @@ public class IPCLoggerChannel implements AsyncLogger {
private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000; private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000;
static final Factory FACTORY = new AsyncLogger.Factory() { static final Factory FACTORY = IPCLoggerChannel::new;
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr);
}
};
public IPCLoggerChannel(Configuration conf, public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo,
NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) {
String journalId,
InetSocketAddress addr) {
this(conf, nsInfo, journalId, null, addr); this(conf, nsInfo, journalId, null, addr);
} }
public IPCLoggerChannel(Configuration conf, public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo,
NamespaceInfo nsInfo, String journalId, String nameServiceId, InetSocketAddress addr) {
String journalId,
String nameServiceId,
InetSocketAddress addr) {
this.conf = conf; this.conf = conf;
this.nsInfo = nsInfo; this.nsInfo = nsInfo;
this.journalId = journalId; this.journalId = journalId;
@ -202,7 +191,7 @@ public class IPCLoggerChannel implements AsyncLogger {
"Trying to move committed txid backwards in client " + "Trying to move committed txid backwards in client " +
"old: %s new: %s", committedTxId, txid); "old: %s new: %s", committedTxId, txid);
this.committedTxId = txid; this.committedTxId = txid;
this.lastCommitNanos = System.nanoTime(); this.lastCommitNanos = Time.monotonicNowNanos();
} }
@Override @Override
@ -229,25 +218,19 @@ public class IPCLoggerChannel implements AsyncLogger {
final Configuration confCopy = new Configuration(conf); final Configuration confCopy = new Configuration(conf);
// Need to set NODELAY or else batches larger than MTU can trigger // Need to set NODELAY or else batches larger than MTU can trigger
// 40ms nagling delays. // 40ms nailing delays.
confCopy.setBoolean( confCopy.setBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, true);
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
true);
RPC.setProtocolEngine(confCopy, RPC.setProtocolEngine(confCopy,
QJournalProtocolPB.class, ProtobufRpcEngine2.class); QJournalProtocolPB.class, ProtobufRpcEngine2.class);
return SecurityUtil.doAsLoginUser( return SecurityUtil.doAsLoginUser(
new PrivilegedExceptionAction<QJournalProtocol>() { (PrivilegedExceptionAction<QJournalProtocol>) () -> {
@Override RPC.setProtocolEngine(confCopy,
public QJournalProtocol run() throws IOException { QJournalProtocolPB.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(confCopy, QJournalProtocolPB pbproxy = RPC.getProxy(
QJournalProtocolPB.class, ProtobufRpcEngine2.class); QJournalProtocolPB.class,
QJournalProtocolPB pbproxy = RPC.getProxy( RPC.getProtocolVersion(QJournalProtocolPB.class),
QJournalProtocolPB.class, addr, confCopy);
RPC.getProtocolVersion(QJournalProtocolPB.class), return new QJournalProtocolTranslatorPB(pbproxy);
addr, confCopy);
return new QJournalProtocolTranslatorPB(pbproxy);
}
}); });
} }
@ -260,10 +243,8 @@ public class IPCLoggerChannel implements AsyncLogger {
return Executors.newSingleThreadExecutor( return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setDaemon(true) .setDaemon(true)
.setNameFormat("Logger channel (from single-thread executor) to " + .setNameFormat("Logger channel (from single-thread executor) to " + addr)
addr) .setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
.setUncaughtExceptionHandler(
UncaughtExceptionHandlers.systemExit())
.build()); .build());
} }
@ -308,11 +289,6 @@ public class IPCLoggerChannel implements AsyncLogger {
epoch, ipcSerial++, committedTxId); epoch, ipcSerial++, committedTxId);
} }
@VisibleForTesting
synchronized long getNextIpcSerial() {
return ipcSerial;
}
public synchronized int getQueuedEditsSize() { public synchronized int getQueuedEditsSize() {
return queuedEditsSizeBytes; return queuedEditsSizeBytes;
} }
@ -333,11 +309,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@VisibleForTesting @VisibleForTesting
void waitForAllPendingCalls() throws InterruptedException { void waitForAllPendingCalls() throws InterruptedException {
try { try {
singleThreadExecutor.submit(new Runnable() { singleThreadExecutor.submit(() -> {}).get();
@Override
public void run() {
}
}).get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
// This can't happen! // This can't happen!
throw new AssertionError(e); throw new AssertionError(e);
@ -346,36 +318,23 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override @Override
public ListenableFuture<Boolean> isFormatted() { public ListenableFuture<Boolean> isFormatted() {
return singleThreadExecutor.submit(new Callable<Boolean>() { return singleThreadExecutor.submit(() -> getProxy().isFormatted(journalId, nameServiceId));
@Override
public Boolean call() throws IOException {
return getProxy().isFormatted(journalId, nameServiceId);
}
});
} }
@Override @Override
public ListenableFuture<GetJournalStateResponseProto> getJournalState() { public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
return singleThreadExecutor.submit(new Callable<GetJournalStateResponseProto>() { return singleThreadExecutor.submit(() -> {
@Override GetJournalStateResponseProto ret = getProxy().getJournalState(journalId, nameServiceId);
public GetJournalStateResponseProto call() throws IOException { constructHttpServerURI(ret);
GetJournalStateResponseProto ret = return ret;
getProxy().getJournalState(journalId, nameServiceId);
constructHttpServerURI(ret);
return ret;
}
}); });
} }
@Override @Override
public ListenableFuture<NewEpochResponseProto> newEpoch( public ListenableFuture<NewEpochResponseProto> newEpoch(
final long epoch) { final long epoch) {
return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() { return singleThreadExecutor.submit(
@Override () -> getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch));
public NewEpochResponseProto call() throws IOException {
return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch);
}
});
} }
@Override @Override
@ -390,50 +349,43 @@ public class IPCLoggerChannel implements AsyncLogger {
// When this batch is acked, we use its submission time in order // When this batch is acked, we use its submission time in order
// to calculate how far we are lagging. // to calculate how far we are lagging.
final long submitNanos = System.nanoTime(); final long submitNanos = Time.monotonicNowNanos();
ListenableFuture<Void> ret = null; ListenableFuture<Void> ret = null;
try { try {
ret = singleThreadExecutor.submit(new Callable<Void>() { ret = singleThreadExecutor.submit(() -> {
@Override throwIfOutOfSync();
public Void call() throws IOException {
throwIfOutOfSync();
long rpcSendTimeNanos = System.nanoTime(); final long rpcSendTimeNanos = Time.monotonicNowNanos();
try { try {
getProxy().journal(createReqInfo(), getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data);
segmentTxId, firstTxnId, numTxns, data); } catch (IOException e) {
} catch (IOException e) { QuorumJournalManager.LOG.warn("Remote journal {} failed to write txns {}-{}."
QuorumJournalManager.LOG.warn( + " Will try to write to this JN again after the next log roll.",
"Remote journal " + IPCLoggerChannel.this + " failed to " + IPCLoggerChannel.this, firstTxnId, (firstTxnId + numTxns - 1), e);
"write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
". Will try to write to this JN again after the next " +
"log roll.", e);
synchronized (IPCLoggerChannel.this) {
outOfSync = true;
}
throw e;
} finally {
long now = System.nanoTime();
long rpcTime = TimeUnit.MICROSECONDS.convert(
now - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
long endToEndTime = TimeUnit.MICROSECONDS.convert(
now - submitNanos, TimeUnit.NANOSECONDS);
metrics.addWriteEndToEndLatency(endToEndTime);
metrics.addWriteRpcLatency(rpcTime);
if (rpcTime / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
QuorumJournalManager.LOG.warn(
"Took " + (rpcTime / 1000) + "ms to send a batch of " +
numTxns + " edits (" + data.length + " bytes) to " +
"remote journal " + IPCLoggerChannel.this);
}
}
synchronized (IPCLoggerChannel.this) { synchronized (IPCLoggerChannel.this) {
highestAckedTxId = firstTxnId + numTxns - 1; outOfSync = true;
lastAckNanos = submitNanos; }
throw e;
} finally {
final long nowNanos = Time.monotonicNowNanos();
final long rpcTimeMicros = TimeUnit.MICROSECONDS.convert(
(nowNanos - rpcSendTimeNanos), TimeUnit.NANOSECONDS);
final long endToEndTimeMicros = TimeUnit.MICROSECONDS.convert(
(nowNanos - submitNanos), TimeUnit.NANOSECONDS);
metrics.addWriteEndToEndLatency(endToEndTimeMicros);
metrics.addWriteRpcLatency(rpcTimeMicros);
if (rpcTimeMicros / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
QuorumJournalManager.LOG.warn(
"Took {}ms to send a batch of {} edits ({} bytes) to remote journal {}.",
rpcTimeMicros / 1000, numTxns, data.length, IPCLoggerChannel.this);
} }
return null;
} }
synchronized (IPCLoggerChannel.this) {
highestAckedTxId = firstTxnId + numTxns - 1;
lastAckNanos = submitNanos;
}
return null;
}); });
} finally { } finally {
if (ret == null) { if (ret == null) {
@ -460,14 +412,12 @@ public class IPCLoggerChannel implements AsyncLogger {
return ret; return ret;
} }
private void throwIfOutOfSync() private void throwIfOutOfSync() throws IOException {
throws JournalOutOfSyncException, IOException {
if (isOutOfSync()) { if (isOutOfSync()) {
// Even if we're out of sync, it's useful to send an RPC // Even if we're out of sync, it's useful to send an RPC
// to the remote node in order to update its lag metrics, etc. // to the remote node in order to update its lag metrics, etc.
heartbeatIfNecessary(); heartbeatIfNecessary();
throw new JournalOutOfSyncException( throw new JournalOutOfSyncException("Journal disabled until next roll");
"Journal disabled until next roll");
} }
} }
@ -497,12 +447,10 @@ public class IPCLoggerChannel implements AsyncLogger {
private synchronized void reserveQueueSpace(int size) private synchronized void reserveQueueSpace(int size)
throws LoggerTooFarBehindException { throws LoggerTooFarBehindException {
Preconditions.checkArgument(size >= 0); Preconditions.checkArgument(size >= 0);
if (queuedEditsSizeBytes + size > queueSizeLimitBytes && if (queuedEditsSizeBytes + size > queueSizeLimitBytes && queuedEditsSizeBytes > 0) {
queuedEditsSizeBytes > 0) { QuorumJournalManager.LOG.warn("Pending edits to {} is going to exceed limit size: {}"
QuorumJournalManager.LOG.warn("Pending edits to " + IPCLoggerChannel.this + ", current queued edits size: {}, will silently drop {} bytes of edits!",
+ " is going to exceed limit size: " + queueSizeLimitBytes IPCLoggerChannel.class, queueSizeLimitBytes, queuedEditsSizeBytes, size);
+ ", current queued edits size: " + queuedEditsSizeBytes
+ ", will silently drop " + size + " bytes of edits!");
throw new LoggerTooFarBehindException(); throw new LoggerTooFarBehindException();
} }
queuedEditsSizeBytes += size; queuedEditsSizeBytes += size;
@ -514,203 +462,144 @@ public class IPCLoggerChannel implements AsyncLogger {
} }
@Override @Override
public ListenableFuture<Void> format(final NamespaceInfo nsInfo, public ListenableFuture<Void> format(final NamespaceInfo nsInfo, final boolean force) {
final boolean force) { return singleThreadExecutor.submit(() -> {
return singleThreadExecutor.submit(new Callable<Void>() { getProxy().format(journalId, nameServiceId, nsInfo, force);
@Override return null;
public Void call() throws Exception {
getProxy().format(journalId, nameServiceId, nsInfo, force);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Void> startLogSegment(final long txid, public ListenableFuture<Void> startLogSegment(final long txid, final int layoutVersion) {
final int layoutVersion) { return singleThreadExecutor.submit(() -> {
return singleThreadExecutor.submit(new Callable<Void>() { getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
@Override synchronized (IPCLoggerChannel.this) {
public Void call() throws IOException { if (outOfSync) {
getProxy().startLogSegment(createReqInfo(), txid, layoutVersion); outOfSync = false;
synchronized (IPCLoggerChannel.this) { QuorumJournalManager.LOG.info(
if (outOfSync) { "Restarting previously-stopped writes to {} in segment starting at txid {}.",
outOfSync = false; IPCLoggerChannel.class, txid);
QuorumJournalManager.LOG.info(
"Restarting previously-stopped writes to " +
IPCLoggerChannel.this + " in segment starting at txid " +
txid);
}
} }
return null;
} }
return null;
}); });
} }
@Override @Override
public ListenableFuture<Void> finalizeLogSegment( public ListenableFuture<Void> finalizeLogSegment(final long startTxId, final long endTxId) {
final long startTxId, final long endTxId) { return singleThreadExecutor.submit(() -> {
return singleThreadExecutor.submit(new Callable<Void>() { throwIfOutOfSync();
@Override getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
public Void call() throws IOException { return null;
throwIfOutOfSync();
getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) { public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(() -> {
@Override getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
public Void call() throws Exception { return null;
getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits( public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
long fromTxnId, int maxTransactions) { long fromTxnId, int maxTransactions) {
return parallelExecutor.submit( return parallelExecutor.submit(() -> getProxy().getJournaledEdits(
new Callable<GetJournaledEditsResponseProto>() { journalId, nameServiceId, fromTxnId, maxTransactions));
@Override
public GetJournaledEditsResponseProto call() throws IOException {
return getProxy().getJournaledEdits(journalId, nameServiceId,
fromTxnId, maxTransactions);
}
});
} }
@Override @Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId, final boolean inProgressOk) { final long fromTxnId, final boolean inProgressOk) {
return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() { return parallelExecutor.submit(() -> {
@Override GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
public RemoteEditLogManifest call() throws IOException { journalId, nameServiceId, fromTxnId, inProgressOk);
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( // Update the http port, since we need this to build URLs to any of the
journalId, nameServiceId, fromTxnId, inProgressOk); // returned logs.
// Update the http port, since we need this to build URLs to any of the constructHttpServerURI(ret);
// returned logs. return PBHelper.convert(ret.getManifest());
});
}
@Override
public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(final long segmentTxId) {
return singleThreadExecutor.submit(() -> {
if (!hasHttpServerEndPoint()) {
// force an RPC call, so we know what the HTTP port should be if it
// hasn't done so.
GetJournalStateResponseProto ret = getProxy().getJournalState(
journalId, nameServiceId);
constructHttpServerURI(ret); constructHttpServerURI(ret);
return PBHelper.convert(ret.getManifest());
} }
return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
}); });
} }
@Override @Override
public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery( public ListenableFuture<Void> acceptRecovery(final SegmentStateProto log, final URL url) {
final long segmentTxId) { return singleThreadExecutor.submit(() -> {
return singleThreadExecutor.submit(new Callable<PrepareRecoveryResponseProto>() { getProxy().acceptRecovery(createReqInfo(), log, url);
@Override return null;
public PrepareRecoveryResponseProto call() throws IOException {
if (!hasHttpServerEndPoint()) {
// force an RPC call so we know what the HTTP port should be if it
// haven't done so.
GetJournalStateResponseProto ret = getProxy().getJournalState(
journalId, nameServiceId);
constructHttpServerURI(ret);
}
return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
}
});
}
@Override
public ListenableFuture<Void> acceptRecovery(
final SegmentStateProto log, final URL url) {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().acceptRecovery(createReqInfo(), log, url);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Void> doPreUpgrade() { public ListenableFuture<Void> doPreUpgrade() {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(() -> {
@Override getProxy().doPreUpgrade(journalId);
public Void call() throws IOException { return null;
getProxy().doPreUpgrade(journalId);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) { public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(() -> {
@Override getProxy().doUpgrade(journalId, sInfo);
public Void call() throws IOException { return null;
getProxy().doUpgrade(journalId, sInfo);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Void> doFinalize() { public ListenableFuture<Void> doFinalize() {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(() -> {
@Override getProxy().doFinalize(journalId, nameServiceId);
public Void call() throws IOException { return null;
getProxy().doFinalize(journalId, nameServiceId);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Boolean> canRollBack(final StorageInfo storage, public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
final StorageInfo prevStorage, final int targetLayoutVersion) { final StorageInfo prevStorage, final int targetLayoutVersion) {
return singleThreadExecutor.submit(new Callable<Boolean>() { return singleThreadExecutor.submit(
@Override () -> getProxy().canRollBack(journalId, nameServiceId,
public Boolean call() throws IOException { storage, prevStorage, targetLayoutVersion));
return getProxy().canRollBack(journalId, nameServiceId,
storage, prevStorage, targetLayoutVersion);
}
});
} }
@Override @Override
public ListenableFuture<Void> doRollback() { public ListenableFuture<Void> doRollback() {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(() -> {
@Override getProxy().doRollback(journalId, nameServiceId);
public Void call() throws IOException { return null;
getProxy().doRollback(journalId, nameServiceId);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Void> discardSegments(final long startTxId) { public ListenableFuture<Void> discardSegments(final long startTxId) {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(() -> {
@Override getProxy().discardSegments(journalId, nameServiceId, startTxId);
public Void call() throws IOException { return null;
getProxy().discardSegments(journalId, nameServiceId, startTxId);
return null;
}
}); });
} }
@Override @Override
public ListenableFuture<Long> getJournalCTime() { public ListenableFuture<Long> getJournalCTime() {
return singleThreadExecutor.submit(new Callable<Long>() { return singleThreadExecutor.submit(() -> getProxy().getJournalCTime(journalId, nameServiceId));
@Override
public Long call() throws IOException {
return getProxy().getJournalCTime(journalId, nameServiceId);
}
});
} }
@Override @Override
public String toString() { public String toString() {
return InetAddresses.toAddrString(addr.getAddress()) + ':' + return InetAddresses.toAddrString(addr.getAddress()) + ':' + addr.getPort();
addr.getPort();
} }
@Override @Override
@ -778,5 +667,4 @@ public class IPCLoggerChannel implements AsyncLogger {
private boolean hasHttpServerEndPoint() { private boolean hasHttpServerEndPoint() {
return httpServerURL != null; return httpServerURL != null;
} }
} }