HDFS-13330. ShortCircuitCache#fetchOrCreate never retries. Contributed by Gabor Bota.

(cherry picked from commit e66e287efe)
This commit is contained in:
Wei-Chiu Chuang 2018-04-13 09:17:34 -07:00
parent 994c7d66e0
commit ca8bb322be
2 changed files with 33 additions and 4 deletions

View File

@ -664,6 +664,7 @@ public class ShortCircuitCache implements Closeable {
unref(replica); unref(replica);
} }
static final int FETCH_OR_CREATE_RETRY_TIMES = 3;
/** /**
* Fetch or create a replica. * Fetch or create a replica.
* *
@ -678,11 +679,11 @@ public class ShortCircuitCache implements Closeable {
*/ */
public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
ShortCircuitReplicaCreator creator) { ShortCircuitReplicaCreator creator) {
Waitable<ShortCircuitReplicaInfo> newWaitable = null; Waitable<ShortCircuitReplicaInfo> newWaitable;
lock.lock(); lock.lock();
try { try {
ShortCircuitReplicaInfo info = null; ShortCircuitReplicaInfo info = null;
do { for (int i = 0; i < FETCH_OR_CREATE_RETRY_TIMES; i++){
if (closed) { if (closed) {
LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.", LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
this, key); this, key);
@ -692,11 +693,12 @@ public class ShortCircuitCache implements Closeable {
if (waitable != null) { if (waitable != null) {
try { try {
info = fetch(key, waitable); info = fetch(key, waitable);
break;
} catch (RetriableException e) { } catch (RetriableException e) {
LOG.debug("{}: retrying {}", this, e.getMessage()); LOG.debug("{}: retrying {}", this, e.getMessage());
} }
} }
} while (false); }
if (info != null) return info; if (info != null) return info;
// We need to load the replica ourselves. // We need to load the replica ourselves.
newWaitable = new Waitable<>(lock.newCondition()); newWaitable = new Waitable<>(lock.newCondition());
@ -717,7 +719,8 @@ public class ShortCircuitCache implements Closeable {
* *
* @throws RetriableException If the caller needs to retry. * @throws RetriableException If the caller needs to retry.
*/ */
private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, @VisibleForTesting // ONLY for testing
protected ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
// Another thread is already in the process of loading this // Another thread is already in the process of loading this
// ShortCircuitReplica. So we simply wait for it to complete. // ShortCircuitReplica. So we simply wait for it to complete.

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplica
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -793,4 +794,29 @@ public class TestShortCircuitCache {
cluster.shutdown(); cluster.shutdown();
sockDir.close(); sockDir.close();
} }
@Test
public void testFetchOrCreateRetries() throws Exception {
try(ShortCircuitCache cache = Mockito
.spy(new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0))) {
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ExtendedBlockId extendedBlockId = new ExtendedBlockId(123, "test_bp1");
SimpleReplicaCreator sRC = new SimpleReplicaCreator(123, cache, pair);
// Arrange that fetch will throw RetriableException for any call
Mockito.doThrow(new RetriableException("Retry")).when(cache)
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
// Act: calling fetchOrCreate two times
// first call: it will create and put entry to replicaInfoMap
// second call: it will call fetch to get info for entry, and should
// retry 3 times because RetriableException thrown
cache.fetchOrCreate(extendedBlockId, sRC);
cache.fetchOrCreate(extendedBlockId, sRC);
// Assert that fetchOrCreate retried to fetch at least 3 times
Mockito.verify(cache, Mockito.atLeast(3))
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
}
}
} }