mirror of
https://github.com/apache/druid.git
synced 2025-02-22 18:30:13 +00:00
Refactor lookups behavior while loading/dropping the containers (#14806)
This commit is contained in:
parent
54fa3425c3
commit
e2fde8c516
@ -353,6 +353,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|
||||
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|
||||
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|
||||
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0|
|
||||
|`loadTimeoutSeconds`| How much time (in seconds) it can take to query and populate lookup values. It will be helpful in lookup updates. On lookup update, it will wait maximum of `loadTimeoutSeconds` for new lookup to come up and continue serving from old lookup until new lookup successfully loads. |No|0|
|
||||
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|
|
||||
|
||||
```json
|
||||
|
@ -297,6 +297,18 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
|
||||
return new KafkaLookupExtractorIntrospectionHandler(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
// Kafka lookup do not need await on initialization as it is realtime kafka lookups.
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
|
@ -83,11 +83,13 @@ public class KafkaLookupExtractorFactoryTest
|
||||
mapper.writeValueAsString(expected),
|
||||
KafkaLookupExtractorFactory.class
|
||||
);
|
||||
result.awaitInitialization();
|
||||
Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
|
||||
Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties());
|
||||
Assert.assertEquals(cacheManager, result.getCacheManager());
|
||||
Assert.assertEquals(0, expected.getCompletedEventCount());
|
||||
Assert.assertEquals(0, result.getCompletedEventCount());
|
||||
Assert.assertTrue(result.isInitialized());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -36,6 +36,7 @@ import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@ -169,6 +170,22 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
|
||||
return lookupIntrospectHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization() throws InterruptedException, TimeoutException
|
||||
{
|
||||
long timeout = extractionNamespace.getLoadTimeoutMills();
|
||||
if (entry.getCacheState() == CacheScheduler.NoCache.CACHE_NOT_INITIALIZED) {
|
||||
LOG.info("Cache not initialized yet for namespace %s waiting for %s mills", extractionNamespace, timeout);
|
||||
entry.awaitTotalUpdatesWithTimeout(1, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return entry.getCacheState() instanceof CacheScheduler.VersionedCache;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public ExtractionNamespace getExtractionNamespace()
|
||||
{
|
||||
|
@ -49,4 +49,9 @@ public interface ExtractionNamespace
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
default long getLoadTimeoutMills()
|
||||
{
|
||||
return 60 * 1000;
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
||||
private static final Logger LOG = new Logger(JdbcExtractionNamespace.class);
|
||||
|
||||
long DEFAULT_MAX_HEAP_PERCENTAGE = 10L;
|
||||
long DEFAULT_LOOKUP_LOAD_TIME_SECONDS = 120;
|
||||
|
||||
@JsonProperty
|
||||
private final MetadataStorageConnectorConfig connectorConfig;
|
||||
@ -63,6 +64,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
||||
@JsonProperty
|
||||
private final long maxHeapPercentage;
|
||||
@JsonProperty
|
||||
private final long loadTimeoutSeconds;
|
||||
@JsonProperty
|
||||
private final int jitterSeconds;
|
||||
|
||||
@JsonCreator
|
||||
@ -77,6 +80,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
||||
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod,
|
||||
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage,
|
||||
@JsonProperty(value = "jitterSeconds") @Nullable Integer jitterSeconds,
|
||||
@JsonProperty(value = "loadTimeoutSeconds") @Nullable final Long loadTimeoutSeconds,
|
||||
@JacksonInject JdbcAccessSecurityConfig securityConfig
|
||||
)
|
||||
{
|
||||
@ -101,6 +105,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
||||
}
|
||||
this.jitterSeconds = jitterSeconds == null ? 0 : jitterSeconds;
|
||||
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
|
||||
this.loadTimeoutSeconds = loadTimeoutSeconds == null ? DEFAULT_LOOKUP_LOAD_TIME_SECONDS : loadTimeoutSeconds;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -176,6 +181,12 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
||||
return 1000L * ThreadLocalRandom.current().nextInt(jitterSeconds + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLoadTimeoutMills()
|
||||
{
|
||||
return 1000L * loadTimeoutSeconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
@ -187,6 +198,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
||||
", tsColumn='" + tsColumn + '\'' +
|
||||
", filter='" + filter + '\'' +
|
||||
", pollPeriod=" + pollPeriod +
|
||||
", jitterSeconds=" + jitterSeconds +
|
||||
", loadTimeoutSeconds=" + loadTimeoutSeconds +
|
||||
", maxHeapPercentage=" + maxHeapPercentage +
|
||||
'}';
|
||||
}
|
||||
|
@ -115,6 +115,12 @@ public final class CacheScheduler
|
||||
impl.updateCounter.awaitCount(totalUpdates);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void awaitTotalUpdatesWithTimeout(int totalUpdates, long timeoutMills)
|
||||
throws InterruptedException, TimeoutException
|
||||
{
|
||||
impl.updateCounter.awaitCount(totalUpdates, timeoutMills, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
@VisibleForTesting
|
||||
void awaitNextUpdates(int nextUpdates) throws InterruptedException
|
||||
{
|
||||
|
@ -59,6 +59,7 @@ import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.atMostOnce;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
@ -227,7 +228,6 @@ public class NamespaceLookupExtractorFactoryTest
|
||||
);
|
||||
Assert.assertTrue(namespaceLookupExtractorFactory.start());
|
||||
Assert.assertTrue(namespaceLookupExtractorFactory.start());
|
||||
|
||||
verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
|
||||
verifyNoMoreInteractions(scheduler, entry, versionedCache);
|
||||
}
|
||||
@ -287,6 +287,40 @@ public class NamespaceLookupExtractorFactoryTest
|
||||
verifyNoMoreInteractions(scheduler, entry, versionedCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAwaitInitializationOnCacheNotInitialized() throws Exception
|
||||
{
|
||||
final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
|
||||
{
|
||||
@Override
|
||||
public long getPollMs()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLoadTimeoutMills()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
expectScheduleAndWaitOnce(extractionNamespace);
|
||||
when(entry.getCacheState()).thenReturn(CacheScheduler.NoCache.CACHE_NOT_INITIALIZED);
|
||||
|
||||
final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
|
||||
extractionNamespace,
|
||||
scheduler
|
||||
);
|
||||
Assert.assertTrue(namespaceLookupExtractorFactory.start());
|
||||
namespaceLookupExtractorFactory.awaitInitialization();
|
||||
Assert.assertThrows(ISE.class, () -> namespaceLookupExtractorFactory.get());
|
||||
verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
|
||||
verify(entry, times(2)).getCacheState();
|
||||
verify(entry).awaitTotalUpdatesWithTimeout(1, 1);
|
||||
Thread.sleep(10);
|
||||
verifyNoMoreInteractions(scheduler, entry, versionedCache);
|
||||
}
|
||||
|
||||
private void expectScheduleAndWaitOnce(ExtractionNamespace extractionNamespace)
|
||||
{
|
||||
try {
|
||||
|
@ -63,7 +63,9 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0, new JdbcAccessSecurityConfig()
|
||||
0,
|
||||
1000L,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
public Set<String> getAllowedProperties()
|
||||
@ -102,6 +104,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -139,6 +142,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -178,6 +182,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -221,6 +226,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -260,6 +266,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
10L,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -296,7 +303,9 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0, new JdbcAccessSecurityConfig()
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
public Set<String> getAllowedProperties()
|
||||
@ -335,6 +344,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -380,6 +390,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
@ -423,6 +434,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -138,6 +138,7 @@ public class JdbcCacheGeneratorTest
|
||||
Period.ZERO,
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
}
|
||||
|
@ -191,6 +191,24 @@ public class CacheSchedulerTest
|
||||
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000L)
|
||||
public void testInitialization() throws InterruptedException, TimeoutException
|
||||
{
|
||||
UriExtractionNamespace namespace = new UriExtractionNamespace(
|
||||
tmpFile.toURI(),
|
||||
null, null,
|
||||
new UriExtractionNamespace.ObjectMapperFlatDataParser(
|
||||
UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
|
||||
),
|
||||
new Period(0),
|
||||
null,
|
||||
null
|
||||
);
|
||||
CacheScheduler.Entry entry = scheduler.schedule(namespace);
|
||||
entry.awaitTotalUpdatesWithTimeout(1, 2000);
|
||||
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000L)
|
||||
public void testPeriodicUpdatesScheduled() throws InterruptedException
|
||||
{
|
||||
@ -459,6 +477,7 @@ public class CacheSchedulerTest
|
||||
new Period(10_000),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -329,6 +329,7 @@ public class JdbcExtractionNamespaceTest
|
||||
new Period(0),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
|
||||
@ -363,6 +364,7 @@ public class JdbcExtractionNamespaceTest
|
||||
new Period(0),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
|
||||
@ -414,6 +416,7 @@ public class JdbcExtractionNamespaceTest
|
||||
new Period(0),
|
||||
null,
|
||||
120,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
long jitter = extractionNamespace.getJitterMills();
|
||||
@ -433,6 +436,7 @@ public class JdbcExtractionNamespaceTest
|
||||
FILTER_COLUMN + "='1'",
|
||||
new Period(0),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
@ -478,6 +482,7 @@ public class JdbcExtractionNamespaceTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
securityConfig
|
||||
);
|
||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
@ -504,6 +509,7 @@ public class JdbcExtractionNamespaceTest
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
|
||||
|
@ -111,6 +111,17 @@ public class LoadingLookupFactory implements LookupExtractorFactory
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
// LoadingLookupFactory does not have any initialization period as it fetches the key from loadingCache and DataFetcher as necessary.
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
public LoadingLookup get()
|
||||
{
|
||||
|
@ -128,6 +128,17 @@ public class PollingLookupFactory implements LookupExtractorFactory
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PollingLookup get()
|
||||
{
|
||||
|
@ -57,6 +57,8 @@ public class LoadingLookupFactoryTest
|
||||
EasyMock.expectLastCall().once();
|
||||
EasyMock.replay(loadingLookup);
|
||||
Assert.assertTrue(loadingLookupFactory.start());
|
||||
loadingLookupFactory.awaitInitialization();
|
||||
Assert.assertTrue(loadingLookupFactory.isInitialized());
|
||||
Assert.assertTrue(loadingLookupFactory.close());
|
||||
EasyMock.verify(loadingLookup);
|
||||
|
||||
|
@ -36,6 +36,8 @@ public class PollingLookupFactoryTest
|
||||
EasyMock.expect(pollingLookup.isOpen()).andReturn(true).once();
|
||||
EasyMock.replay(pollingLookup);
|
||||
Assert.assertTrue(pollingLookupFactory.start());
|
||||
pollingLookupFactory.awaitInitialization();
|
||||
Assert.assertTrue(pollingLookupFactory.isInitialized());
|
||||
EasyMock.verify(pollingLookup);
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Users of Lookup Extraction need to implement a {@link LookupExtractorFactory} supplier of type {@link LookupExtractor}.
|
||||
@ -79,4 +80,14 @@ public interface LookupExtractorFactory extends Supplier<LookupExtractor>
|
||||
*/
|
||||
@Nullable
|
||||
LookupIntrospectHandler getIntrospectHandler();
|
||||
|
||||
/**
|
||||
* awaitToInitialise blocks and wait for the cache to initialize fully.
|
||||
*/
|
||||
void awaitInitialization() throws InterruptedException, TimeoutException;
|
||||
|
||||
/**
|
||||
* @return true if cache is loaded and lookup is queryable else returns false
|
||||
*/
|
||||
boolean isInitialized();
|
||||
}
|
||||
|
@ -104,6 +104,17 @@ public class LookupExtractorFactoryContainerTest
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
|
@ -82,6 +82,17 @@ public class LookupSegmentTest
|
||||
throw new UnsupportedOperationException("not needed for this test");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
|
@ -94,7 +94,9 @@ class LookupListeningResource extends ListenerResource
|
||||
|
||||
try {
|
||||
state.getToLoad().forEach(manager::add);
|
||||
state.getToDrop().forEach(manager::remove);
|
||||
state.getToDrop().forEach(lookName -> {
|
||||
manager.remove(lookName, state.getToLoad().getOrDefault(lookName, null));
|
||||
});
|
||||
|
||||
return Response.status(Response.Status.ACCEPTED).entity(manager.getAllLookupsState()).build();
|
||||
}
|
||||
@ -135,7 +137,7 @@ class LookupListeningResource extends ListenerResource
|
||||
@Override
|
||||
public Object delete(String id)
|
||||
{
|
||||
manager.remove(id);
|
||||
manager.remove(id, null);
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
@ -66,6 +66,7 @@ import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.function.Function;
|
||||
@ -117,6 +118,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
|
||||
private final LookupConfig lookupConfig;
|
||||
|
||||
private ExecutorService lookupUpdateExecutorService;
|
||||
|
||||
@Inject
|
||||
public LookupReferencesManager(
|
||||
LookupConfig lookupConfig,
|
||||
@ -147,6 +150,10 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig;
|
||||
this.lookupConfig = lookupConfig;
|
||||
this.testMode = testMode;
|
||||
this.lookupUpdateExecutorService = Execs.multiThreaded(
|
||||
lookupConfig.getNumLookupLoadingThreads(),
|
||||
"LookupExtractorFactoryContainerProvider-Update-%s"
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
@ -217,7 +224,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>(swappedState.lookupMap);
|
||||
for (Notice notice : swappedState.noticesBeingHandled) {
|
||||
try {
|
||||
notice.handle(lookupMap);
|
||||
notice.handle(lookupMap, this);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOG.error(ex, "Exception occurred while handling lookup notice [%s].", notice);
|
||||
@ -266,7 +273,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
LOG.error(ex, "Failed to close lookup [%s].", e.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
lookupUpdateExecutorService.shutdown();
|
||||
LOG.debug("LookupExtractorFactoryContainerProvider is stopped.");
|
||||
}
|
||||
|
||||
@ -277,10 +284,10 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer, lookupConfig.getLookupStartRetries()));
|
||||
}
|
||||
|
||||
public void remove(String lookupName)
|
||||
public void remove(String lookupName, LookupExtractorFactoryContainer loadedContainer)
|
||||
{
|
||||
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
addNotice(new DropNotice(lookupName));
|
||||
addNotice(new DropNotice(lookupName, loadedContainer));
|
||||
}
|
||||
|
||||
private void addNotice(Notice notice)
|
||||
@ -301,6 +308,11 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
LockSupport.unpark(mainThread);
|
||||
}
|
||||
|
||||
public void submitAsyncLookupTask(Runnable task)
|
||||
{
|
||||
lookupUpdateExecutorService.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
|
||||
{
|
||||
@ -595,11 +607,24 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
)
|
||||
);
|
||||
}
|
||||
private void dropContainer(LookupExtractorFactoryContainer container, String lookupName)
|
||||
{
|
||||
if (container != null) {
|
||||
LOG.debug("Removed lookup [%s] with spec [%s].", lookupName, container);
|
||||
|
||||
if (!container.getLookupExtractorFactory().destroy()) {
|
||||
throw new ISE(
|
||||
"destroy method returned false for lookup [%s]:[%s]",
|
||||
lookupName,
|
||||
container
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@VisibleForTesting
|
||||
interface Notice
|
||||
{
|
||||
void handle(Map<String, LookupExtractorFactoryContainer> lookupMap) throws Exception;
|
||||
void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, LookupReferencesManager manager) throws Exception;
|
||||
}
|
||||
|
||||
private static class LoadNotice implements Notice
|
||||
@ -616,7 +641,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap) throws Exception
|
||||
public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, LookupReferencesManager manager)
|
||||
throws Exception
|
||||
{
|
||||
LookupExtractorFactoryContainer old = lookupMap.get(lookupName);
|
||||
if (old != null && !lookupExtractorFactoryContainer.replaces(old)) {
|
||||
@ -642,18 +668,45 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
e -> true,
|
||||
startRetries
|
||||
);
|
||||
|
||||
old = lookupMap.put(lookupName, lookupExtractorFactoryContainer);
|
||||
|
||||
LOG.debug("Loaded lookup [%s] with spec [%s].", lookupName, lookupExtractorFactoryContainer);
|
||||
|
||||
if (old != null) {
|
||||
if (!old.getLookupExtractorFactory().destroy()) {
|
||||
throw new ISE("destroy method returned false for lookup [%s]:[%s]", lookupName, old);
|
||||
}
|
||||
if (lookupExtractorFactoryContainer.getLookupExtractorFactory().isInitialized()) {
|
||||
old = lookupMap.put(lookupName, lookupExtractorFactoryContainer);
|
||||
LOG.debug("Loaded lookup [%s] with spec [%s].", lookupName, lookupExtractorFactoryContainer);
|
||||
manager.dropContainer(old, lookupName);
|
||||
return;
|
||||
}
|
||||
manager.submitAsyncLookupTask(() -> {
|
||||
try {
|
||||
/*
|
||||
Retry startRetries times and wait for first cache to load for new container,
|
||||
if loaded then kill old container and start serving from new one.
|
||||
If new lookupExtractorFactoryContainer has errors in loading, kill the new container and do not remove the old container
|
||||
*/
|
||||
RetryUtils.retry(
|
||||
() -> {
|
||||
lookupExtractorFactoryContainer.getLookupExtractorFactory().awaitInitialization();
|
||||
return null;
|
||||
}, e -> true,
|
||||
startRetries
|
||||
);
|
||||
if (lookupExtractorFactoryContainer.getLookupExtractorFactory().isInitialized()) {
|
||||
// send load notice with cache loaded container
|
||||
manager.add(lookupName, lookupExtractorFactoryContainer);
|
||||
} else {
|
||||
// skip loading new container as it is failed after 3 attempts
|
||||
manager.dropContainer(lookupExtractorFactoryContainer, lookupName);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// drop new failed container and continue serving old one
|
||||
LOG.error(
|
||||
e,
|
||||
"Exception in updating the namespace %s, continue serving from old container and killing new container ",
|
||||
lookupExtractorFactoryContainer
|
||||
);
|
||||
manager.dropContainer(lookupExtractorFactoryContainer, lookupName);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
@ -667,28 +720,36 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
|
||||
private static class DropNotice implements Notice
|
||||
{
|
||||
private final String lookupName;
|
||||
private final LookupExtractorFactoryContainer loadedContainer;
|
||||
|
||||
DropNotice(String lookupName)
|
||||
/**
|
||||
* @param lookupName Name of the lookup to drop
|
||||
* @param loadedContainer Container ref to newly loaded container, this is mandatory in the update lookup call, it can be null in purely drop call.
|
||||
*/
|
||||
DropNotice(String lookupName, @Nullable LookupExtractorFactoryContainer loadedContainer)
|
||||
{
|
||||
this.lookupName = lookupName;
|
||||
this.loadedContainer = loadedContainer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap)
|
||||
public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap, LookupReferencesManager manager)
|
||||
{
|
||||
final LookupExtractorFactoryContainer lookupExtractorFactoryContainer = lookupMap.remove(lookupName);
|
||||
|
||||
if (lookupExtractorFactoryContainer != null) {
|
||||
LOG.debug("Removed lookup [%s] with spec [%s].", lookupName, lookupExtractorFactoryContainer);
|
||||
|
||||
if (!lookupExtractorFactoryContainer.getLookupExtractorFactory().destroy()) {
|
||||
throw new ISE(
|
||||
"destroy method returned false for lookup [%s]:[%s]",
|
||||
lookupName,
|
||||
lookupExtractorFactoryContainer
|
||||
);
|
||||
}
|
||||
if (loadedContainer != null && !loadedContainer.getLookupExtractorFactory().isInitialized()) {
|
||||
final LookupExtractorFactoryContainer containterToDrop = lookupMap.get(lookupName);
|
||||
manager.submitAsyncLookupTask(() -> {
|
||||
try {
|
||||
loadedContainer.getLookupExtractorFactory().awaitInitialization();
|
||||
manager.dropContainer(containterToDrop, lookupName);
|
||||
}
|
||||
catch (InterruptedException | TimeoutException e) {
|
||||
// do nothing as loadedContainer is dropped by LoadNotice handler eventually if cache is not loaded
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
final LookupExtractorFactoryContainer lookupExtractorFactoryContainer = lookupMap.remove(lookupName);
|
||||
manager.dropContainer(lookupExtractorFactoryContainer, lookupName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -87,6 +87,17 @@ public class MapLookupExtractorFactory implements LookupExtractorFactory
|
||||
return lookupIntrospectHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
|
@ -98,6 +98,17 @@ public class LookupEnabledTestExprMacroTable extends ExprMacroTable
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
|
@ -46,6 +46,7 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class LookupReferencesManagerTest
|
||||
{
|
||||
@ -149,7 +150,7 @@ public class LookupReferencesManagerTest
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testRemoveExceptionWhenClosed()
|
||||
{
|
||||
lookupReferencesManager.remove("test");
|
||||
lookupReferencesManager.remove("test", null);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
@ -164,6 +165,7 @@ public class LookupReferencesManagerTest
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
@ -193,18 +195,132 @@ public class LookupReferencesManagerTest
|
||||
|
||||
Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test"));
|
||||
|
||||
lookupReferencesManager.remove("test");
|
||||
lookupReferencesManager.remove("test", testContainer);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadBadContaineAfterOldGoodContainer() throws Exception
|
||||
{
|
||||
// Test the scenario of not loading the new container until it get intialized
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForAddGetRemove", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
|
||||
EasyMock.replay(config);
|
||||
EasyMock.expect(druidLeaderClient.makeRequest(
|
||||
HttpMethod.GET,
|
||||
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
|
||||
))
|
||||
.andReturn(request);
|
||||
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
|
||||
newEmptyResponse(HttpResponseStatus.OK),
|
||||
StandardCharsets.UTF_8
|
||||
).addChunk(strResult);
|
||||
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
EasyMock.replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
|
||||
|
||||
LookupExtractorFactoryContainer testContainer = new LookupExtractorFactoryContainer("0", lookupExtractorFactory);
|
||||
|
||||
lookupReferencesManager.add("test", testContainer);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test"));
|
||||
|
||||
LookupExtractorFactory badLookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(badLookupExtractorFactory.start()).andReturn(false).anyTimes();
|
||||
badLookupExtractorFactory.awaitInitialization();
|
||||
EasyMock.expectLastCall().andThrow(new TimeoutException());
|
||||
EasyMock.expect(badLookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.expect(badLookupExtractorFactory.isInitialized()).andReturn(false).anyTimes();
|
||||
EasyMock.replay(badLookupExtractorFactory);
|
||||
LookupExtractorFactoryContainer badContainer = new LookupExtractorFactoryContainer("0", badLookupExtractorFactory);
|
||||
lookupReferencesManager.add("test", badContainer);
|
||||
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test"));
|
||||
|
||||
lookupReferencesManager.remove("test", testContainer);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDropOldContainerAfterNewLoadGoodContainer() throws Exception
|
||||
{
|
||||
// Test the scenario of dropping the current container only when new container gets initialized
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForAddGetRemove", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
|
||||
EasyMock.replay(config);
|
||||
EasyMock.expect(druidLeaderClient.makeRequest(
|
||||
HttpMethod.GET,
|
||||
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
|
||||
))
|
||||
.andReturn(request);
|
||||
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
|
||||
newEmptyResponse(HttpResponseStatus.OK),
|
||||
StandardCharsets.UTF_8
|
||||
).addChunk(strResult);
|
||||
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
EasyMock.replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
|
||||
|
||||
LookupExtractorFactoryContainer testContainer = new LookupExtractorFactoryContainer("0", lookupExtractorFactory);
|
||||
|
||||
lookupReferencesManager.add("test", testContainer);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test"));
|
||||
|
||||
LookupExtractorFactory badLookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(badLookupExtractorFactory.start()).andReturn(false).anyTimes();
|
||||
badLookupExtractorFactory.awaitInitialization();
|
||||
EasyMock.expectLastCall().andThrow(new TimeoutException());
|
||||
EasyMock.expect(badLookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.expect(badLookupExtractorFactory.isInitialized()).andReturn(false).anyTimes();
|
||||
EasyMock.replay(badLookupExtractorFactory);
|
||||
LookupExtractorFactoryContainer badContainer = new LookupExtractorFactoryContainer("0", badLookupExtractorFactory);
|
||||
lookupReferencesManager.remove("test", badContainer); // new container to load is badContainer here
|
||||
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test"));
|
||||
|
||||
lookupReferencesManager.remove("test", testContainer);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
|
||||
}
|
||||
@Test
|
||||
public void testCloseIsCalledAfterStopping() throws Exception
|
||||
{
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForCloseIsCalledAfterStopping", container);
|
||||
@ -234,7 +350,8 @@ public class LookupReferencesManagerTest
|
||||
@Test
|
||||
public void testDestroyIsCalledAfterRemove() throws Exception
|
||||
{
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
|
||||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
@ -256,11 +373,12 @@ public class LookupReferencesManagerTest
|
||||
).addChunk(strResult);
|
||||
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
EasyMock.replay(druidLeaderClient);
|
||||
LookupExtractorFactoryContainer container = new LookupExtractorFactoryContainer("0", lookupExtractorFactory);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testMock", new LookupExtractorFactoryContainer("0", lookupExtractorFactory));
|
||||
lookupReferencesManager.add("testMock", container);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
lookupReferencesManager.remove("testMock");
|
||||
lookupReferencesManager.remove("testMock", container);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
||||
EasyMock.verify(lookupExtractorFactory);
|
||||
@ -385,7 +503,7 @@ public class LookupReferencesManagerTest
|
||||
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
EasyMock.replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.remove("test");
|
||||
lookupReferencesManager.remove("test", null);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
}
|
||||
|
||||
@ -480,7 +598,7 @@ public class LookupReferencesManagerTest
|
||||
lookupReferencesManager.add("one", container1);
|
||||
lookupReferencesManager.add("two", container2);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
lookupReferencesManager.remove("one");
|
||||
lookupReferencesManager.remove("one", container1);
|
||||
lookupReferencesManager.add("three", container3);
|
||||
|
||||
LookupsState state = lookupReferencesManager.getAllLookupsState();
|
||||
@ -526,6 +644,7 @@ public class LookupReferencesManagerTest
|
||||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class);
|
||||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test"));
|
||||
|
||||
@ -541,7 +660,7 @@ public class LookupReferencesManagerTest
|
||||
lookupReferencesManager.getAllLookupNames()
|
||||
);
|
||||
|
||||
lookupReferencesManager.remove("test");
|
||||
lookupReferencesManager.remove("test", null);
|
||||
|
||||
while (lookupReferencesManager.get("test").isPresent()) {
|
||||
Thread.sleep(100);
|
||||
|
@ -282,6 +282,16 @@ public class RegisteredLookupExtractionFnTest
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitInitialization()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInitialized()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user