From 43c1ebae168eb64571d1a1533623dcec21620566 Mon Sep 17 00:00:00 2001 From: ZanderXu <15040255127@163.com> Date: Fri, 16 Sep 2022 03:44:36 +0800 Subject: [PATCH] HDFS-16771. Follow-up for HDFS-16659: JN should tersely print logs about NewerTxnIdException (#4882) Signed-off-by: Erik Krogen Co-authored-by: zengqiang.xu --- .../qjournal/client/QuorumJournalManager.java | 4 -- .../hadoop/hdfs/qjournal/server/Journal.java | 14 ++++++- .../qjournal/server/JournalNodeRpcServer.java | 2 + .../client/TestQuorumJournalManager.java | 37 +++++++++++++++++++ 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 0e3a8dd0915..faf71a7b545 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -31,7 +31,6 @@ import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.hdfs.qjournal.server.NewerTxnIdException; import org.apache.hadoop.util.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -524,9 +523,6 @@ public class QuorumJournalManager implements JournalManager { selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns); streams.addAll(rpcStreams); return; - } catch (NewerTxnIdException ntie) { - // normal situation, we requested newer IDs than any journal has. no new streams - return; } catch (IOException ioe) { LOG.warn("Encountered exception while tailing edits >= " + fromTxnId + " via RPC; falling back to streaming.", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 7726377538a..ffa613018c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -751,8 +751,18 @@ public class Journal implements Closeable { "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY); } long highestTxId = getHighestWrittenTxId(); - if (sinceTxId > highestTxId) { - // Requested edits that don't exist yet and is newer than highestTxId. + if (sinceTxId == highestTxId + 1) { + // Requested edits that don't exist yet, but this is expected, + // because namenode always get the journaled edits with the sinceTxId + // equal to image.getLastAppliedTxId() + 1. Short-circuiting the cache here + // and returning a response with a count of 0. + metrics.rpcEmptyResponses.incr(); + return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build(); + } else if (sinceTxId > highestTxId + 1) { + // Requested edits that don't exist yet and this is unexpected. Means that there is a lag + // in this journal that does not contain some edits that should exist. + // Throw one NewerTxnIdException to make namenode treat this response as an exception. + // More detailed info please refer to: HDFS-16659 and HDFS-16771. metrics.rpcEmptyResponses.incr(); throw new NewerTxnIdException( "Highest txn ID available in the journal is %d, but requested txns starting at %d.", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index ad67cf481ae..ab909aef2ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -114,6 +114,8 @@ public class JournalNodeRpcServer implements QJournalProtocol, .setVerbose(false) .build(); + this.server.addTerseExceptions(NewerTxnIdException.class); + this.server.addTerseExceptions(JournaledEditsCache.CacheMissException.class); //Adding InterQJournalProtocolPB to server InterQJournalProtocolServerSideTranslatorPB diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 84ce7c25724..e2ee2e365d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -42,8 +42,10 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hdfs.server.common.Util; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.net.MockDomainNameResolver; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture; @@ -1263,4 +1265,39 @@ public class TestQuorumJournalManager { segmentTxId); } } + + @Test + public void testSelectLatestEditsWithoutStreaming() throws Exception { + EditLogOutputStream stm = qjm.startLogSegment( + 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + // Successfully write these edits to JN0 ~ JN2 + writeTxns(stm, 1, 10); + + AtomicInteger atomicInteger = new AtomicInteger(0); + spyGetEditLogManifest(0, 11, true, atomicInteger::incrementAndGet); + spyGetEditLogManifest(1, 11, true, atomicInteger::incrementAndGet); + spyGetEditLogManifest(2, 11, true, atomicInteger::incrementAndGet); + + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 1, true, true); + assertEquals(1, streams.size()); + assertEquals(1, streams.get(0).getFirstTxId()); + assertEquals(10, streams.get(0).getLastTxId()); + + streams.clear(); + qjm.selectInputStreams(streams, 11, true, true); + assertEquals(0, streams.size()); + assertEquals(0, atomicInteger.get()); + } + + private void spyGetEditLogManifest(int jnSpyIdx, long fromTxId, + boolean inProgressOk, Runnable preHook) { + Mockito.doAnswer((Answer>) invocation -> { + preHook.run(); + @SuppressWarnings("unchecked") + ListenableFuture result = + (ListenableFuture) invocation.callRealMethod(); + return result; + }).when(spies.get(jnSpyIdx)).getEditLogManifest(fromTxId, inProgressOk); + } }