From c947c326e8e7a649161fd26672870f5494b01b33 Mon Sep 17 00:00:00 2001 From: ZanderXu <15040255127@163.com> Date: Wed, 7 Sep 2022 01:12:55 +0800 Subject: [PATCH] HDFS-16659. JournalNode should throw NewerTxnIdException when SinceTxId is bigger than HighestWrittenTxId (#4560) Co-authored-by: Zander Xu Signed-off-by: Erik Krogen --- .../qjournal/client/QuorumJournalManager.java | 4 ++ .../hadoop/hdfs/qjournal/server/Journal.java | 9 ++- .../qjournal/server/NewerTxnIdException.java | 31 ++++++++++ .../client/TestQuorumJournalManager.java | 57 ++++++++++++++++++- 4 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index faf71a7b545..0e3a8dd0915 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -31,6 +31,7 @@ import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hdfs.qjournal.server.NewerTxnIdException; import org.apache.hadoop.util.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -523,6 +524,9 @@ public class QuorumJournalManager implements JournalManager { selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns); streams.addAll(rpcStreams); return; + } catch (NewerTxnIdException ntie) { + // normal situation, we requested newer IDs than any journal has. no new streams + return; } catch (IOException ioe) { LOG.warn("Encountered exception while tailing edits >= " + fromTxnId + " via RPC; falling back to streaming.", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 6b9b4087181..7726377538a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -750,10 +750,13 @@ public class Journal implements Closeable { "is a requirement to fetch journaled edits via RPC. Please enable " + "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY); } - if (sinceTxId > getHighestWrittenTxId()) { - // Requested edits that don't exist yet; short-circuit the cache here + long highestTxId = getHighestWrittenTxId(); + if (sinceTxId > highestTxId) { + // Requested edits that don't exist yet and is newer than highestTxId. metrics.rpcEmptyResponses.incr(); - return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build(); + throw new NewerTxnIdException( + "Highest txn ID available in the journal is %d, but requested txns starting at %d.", + highestTxId, sinceTxId); } try { List buffers = new ArrayList<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java new file mode 100644 index 00000000000..ec691402719 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.IOException; + +/** + * Exception when no edits are available. + */ +public class NewerTxnIdException extends IOException { + private static final long serialVersionUID = 0L; + + public NewerTxnIdException(String msgFormat, Object... msgArgs) { + super(String.format(msgFormat, msgArgs)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 5666ae58b0f..84ce7c25724 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -28,7 +28,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - +import static org.mockito.ArgumentMatchers.eq; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -40,11 +40,15 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.net.MockDomainNameResolver; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture; import org.apache.hadoop.util.Lists; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -53,6 +57,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector; import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; @@ -1101,6 +1106,56 @@ public class TestQuorumJournalManager { } } + /** + * Test selecting EditLogInputStream after some journalNode jitter. + * Suppose there are 3 journalNodes, JN0 ~ JN2. + * 1. JN0 has some abnormal cases when Active Namenode is syncing 10 Edits with first txid 11. + * 2. NameNode just ignore the abnormal JN0 and continue to sync Edits to Journal 1 and 2. + * 3. JN0 backed to health. + * 4. NameNode continue sync 10 Edits with first txid 21. + * 5. At this point, there are no Edits 11 ~ 30 in the cache of JN0. + * 6. Observer NameNode try to select EditLogInputStream through + * getJournaledEdits with since txId 21. + * 7. JN2 has some abnormal cases and caused a slow response. + */ + @Test + public void testSelectViaRPCAfterJNJitter() throws Exception { + EditLogOutputStream stm = qjm.startLogSegment( + 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + SettableFuture slowLog = SettableFuture.create(); + Mockito.doReturn(slowLog).when(spies.get(0)) + .sendEdits(eq(1L), eq(11L), eq(10), Mockito.any()); + // Successfully write these edits to JN0 ~ JN2 + writeTxns(stm, 1, 10); + // Failed write these edits to JN0, but successfully write them to JN1 ~ JN2 + writeTxns(stm, 11, 10); + // Successfully write these edits to JN1 ~ JN2 + writeTxns(stm, 21, 20); + + Semaphore semaphore = new Semaphore(0); + spyGetJournaledEdits(0, 21, () -> semaphore.release(1)); + spyGetJournaledEdits(1, 21, () -> semaphore.release(1)); + spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2)); + + List streams = new ArrayList<>(); + qjm.selectInputStreams(streams, 21, true, true); + + assertEquals(1, streams.size()); + assertEquals(21, streams.get(0).getFirstTxId()); + assertEquals(40, streams.get(0).getLastTxId()); + } + + private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) { + Mockito.doAnswer((Answer>) invocation -> { + preHook.run(); + @SuppressWarnings("unchecked") + ListenableFuture result = + (ListenableFuture) invocation.callRealMethod(); + return result; + }).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId, + QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); + } + @Test public void testSelectViaRpcAfterJNRestart() throws Exception { EditLogOutputStream stm =