diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index b26652b0bee..c2f0350bc3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -664,6 +664,7 @@ public class ShortCircuitCache implements Closeable { unref(replica); } + static final int FETCH_OR_CREATE_RETRY_TIMES = 3; /** * Fetch or create a replica. * @@ -678,11 +679,11 @@ public class ShortCircuitCache implements Closeable { */ public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, ShortCircuitReplicaCreator creator) { - Waitable newWaitable = null; + Waitable newWaitable; lock.lock(); try { ShortCircuitReplicaInfo info = null; - do { + for (int i = 0; i < FETCH_OR_CREATE_RETRY_TIMES; i++){ if (closed) { LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.", this, key); @@ -692,11 +693,12 @@ public class ShortCircuitCache implements Closeable { if (waitable != null) { try { info = fetch(key, waitable); + break; } catch (RetriableException e) { LOG.debug("{}: retrying {}", this, e.getMessage()); } } - } while (false); + } if (info != null) return info; // We need to load the replica ourselves. newWaitable = new Waitable<>(lock.newCondition()); @@ -717,7 +719,8 @@ public class ShortCircuitCache implements Closeable { * * @throws RetriableException If the caller needs to retry. */ - private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, + @VisibleForTesting // ONLY for testing + protected ShortCircuitReplicaInfo fetch(ExtendedBlockId key, Waitable waitable) throws RetriableException { // Another thread is already in the process of loading this // ShortCircuitReplica. So we simply wait for it to complete. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 7ba0edcecc6..5da6a250555 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplica import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -793,4 +794,29 @@ public class TestShortCircuitCache { cluster.shutdown(); sockDir.close(); } + + @Test + public void testFetchOrCreateRetries() throws Exception { + try(ShortCircuitCache cache = Mockito + .spy(new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0))) { + final TestFileDescriptorPair pair = new TestFileDescriptorPair(); + ExtendedBlockId extendedBlockId = new ExtendedBlockId(123, "test_bp1"); + SimpleReplicaCreator sRC = new SimpleReplicaCreator(123, cache, pair); + + // Arrange that fetch will throw RetriableException for any call + Mockito.doThrow(new RetriableException("Retry")).when(cache) + .fetch(Mockito.eq(extendedBlockId), Mockito.any()); + + // Act: calling fetchOrCreate two times + // first call: it will create and put entry to replicaInfoMap + // second call: it will call fetch to get info for entry, and should + // retry 3 times because RetriableException thrown + cache.fetchOrCreate(extendedBlockId, sRC); + cache.fetchOrCreate(extendedBlockId, sRC); + + // Assert that fetchOrCreate retried to fetch at least 3 times + Mockito.verify(cache, Mockito.atLeast(3)) + .fetch(Mockito.eq(extendedBlockId), Mockito.any()); + } + } }