Fail fast incase a lookup load fails (#12397)

Currently while loading a lookup for the first time, loading threads blocks
for `waitForFirstRunMs` incase the lookup failed to load. If the `waitForFirstRunMs`
is long (like 10 minutes), such blocking can slow down the loading of other lookups.

This commit allows the thread to progress as soon as the loading of the lookup fails.
This commit is contained in:
Rohan Garg 2022-04-18 13:14:02 +05:30 committed by GitHub
parent 1201c9b2e5
commit de9f12b5c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 5 deletions

View File

@ -37,10 +37,12 @@ import javax.annotation.Nullable;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -151,6 +153,7 @@ public final class CacheScheduler
private final CacheGenerator<T> cacheGenerator;
private final ConcurrentAwaitableCounter updateCounter = new ConcurrentAwaitableCounter();
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CompletableFuture<Boolean> firstLoadFinishedSuccessfully = new CompletableFuture<>();
private EntryImpl(final T namespace, final Entry<T> entry, final CacheGenerator<T> cacheGenerator)
{
@ -185,13 +188,14 @@ public final class CacheScheduler
private void updateCache()
{
boolean updatedCacheSuccessfully = false;
try {
// Ensures visibility of the whole EntryImpl's state (fields and their state).
startLatch.await();
CacheState currentCacheState = cacheStateHolder.get();
if (!Thread.currentThread().isInterrupted() && currentCacheState != NoCache.ENTRY_CLOSED) {
final String currentVersion = currentVersionOrNull(currentCacheState);
tryUpdateCache(currentVersion);
updatedCacheSuccessfully = tryUpdateCache(currentVersion);
}
}
catch (Throwable t) {
@ -205,9 +209,14 @@ public final class CacheScheduler
throw new RuntimeException(t);
}
}
finally {
if (!firstLoadFinishedSuccessfully.isDone()) {
firstLoadFinishedSuccessfully.complete(updatedCacheSuccessfully);
}
}
}
private void tryUpdateCache(String currentVersion) throws Exception
private boolean tryUpdateCache(String currentVersion) throws Exception
{
boolean updatedCacheSuccessfully = false;
CacheHandler newCache = null;
@ -253,6 +262,7 @@ public final class CacheScheduler
throw t;
}
}
return updatedCacheSuccessfully;
}
private String currentVersionOrNull(CacheState currentCacheState)
@ -467,22 +477,31 @@ public final class CacheScheduler
@Nullable
public Entry scheduleAndWait(ExtractionNamespace namespace, long waitForFirstRunMs) throws InterruptedException
{
Exception loadException = null;
final Entry entry = schedule(namespace);
log.debug("Scheduled new %s", entry);
boolean success = false;
try {
success = entry.impl.updateCounter.awaitFirstIncrement(waitForFirstRunMs, TimeUnit.MILLISECONDS);
success = (boolean) entry.impl.firstLoadFinishedSuccessfully.get(waitForFirstRunMs, TimeUnit.MILLISECONDS);
if (success) {
return entry;
} else {
return null;
}
}
catch (ExecutionException | TimeoutException e) {
loadException = e;
return null;
}
finally {
if (!success) {
// ExecutionException's cause is logged in entry.close()
entry.close();
log.error("CacheScheduler[%s] - problem during start or waiting for the first run", entry);
if (loadException != null) {
log.error(loadException, "CacheScheduler[%s] - problem during start or waiting for the first run", entry);
} else {
log.error("CacheScheduler[%s] - problem during start or waiting for the first run", entry);
}
}
}
}

View File

@ -22,15 +22,20 @@ package org.apache.druid.server.lookup.namespace.cache;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespaceTest;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Period;
@ -54,6 +59,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -144,7 +150,9 @@ public class CacheSchedulerTest
new NoopServiceEmitter(),
ImmutableMap.of(
UriExtractionNamespace.class,
cacheGenerator
cacheGenerator,
JdbcExtractionNamespace.class,
new JdbcCacheGenerator()
),
cacheManager
);
@ -412,6 +420,62 @@ public class CacheSchedulerTest
Assert.assertEquals(0, scheduler.getActiveEntries());
}
@Test(timeout = 60_000L)
public void testSimpleSubmissionSuccessWithWait() throws InterruptedException
{
UriExtractionNamespace namespace = new UriExtractionNamespace(
tmpFile.toURI(),
null, null,
new UriExtractionNamespace.ObjectMapperFlatDataParser(
UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(0),
null,
null
);
CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 10_000L);
waitFor(entry);
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}
@Test(timeout = 20_000L)
public void testSimpleSubmissionFailureWithWait() throws InterruptedException
{
JdbcExtractionNamespace namespace = new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://dummy:3306/db";
}
},
"foo",
"k",
"val",
"time",
"some filter",
new Period(10_000),
null,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
scheduler.scheduleAndWait(namespace, 40_000L);
}
private void scheduleDanglingEntry() throws InterruptedException
{
CacheScheduler.Entry entry = scheduler.schedule(getUriExtractionNamespace(5));