From e2fde8c516e418e4d9711c8490b54318cdd2ccf7 Mon Sep 17 00:00:00 2001 From: Pranav Date: Tue, 7 Nov 2023 10:07:28 -0800 Subject: [PATCH] Refactor lookups behavior while loading/dropping the containers (#14806) --- .../extensions-core/lookups-cached-global.md | 1 + .../lookup/KafkaLookupExtractorFactory.java | 12 ++ .../KafkaLookupExtractorFactoryTest.java | 2 + .../NamespaceLookupExtractorFactory.java | 17 +++ .../lookup/namespace/ExtractionNamespace.java | 5 + .../namespace/JdbcExtractionNamespace.java | 13 ++ .../namespace/cache/CacheScheduler.java | 6 + .../NamespaceLookupExtractorFactoryTest.java | 36 ++++- .../JdbcExtractionNamespaceUrlCheckTest.java | 16 +- .../namespace/JdbcCacheGeneratorTest.java | 1 + .../namespace/cache/CacheSchedulerTest.java | 19 +++ .../cache/JdbcExtractionNamespaceTest.java | 6 + .../server/lookup/LoadingLookupFactory.java | 11 ++ .../server/lookup/PollingLookupFactory.java | 11 ++ .../lookup/LoadingLookupFactoryTest.java | 2 + .../lookup/PollingLookupFactoryTest.java | 2 + .../query/lookup/LookupExtractorFactory.java | 11 ++ .../LookupExtractorFactoryContainerTest.java | 11 ++ .../druid/query/lookup/LookupSegmentTest.java | 11 ++ .../query/lookup/LookupListeningResource.java | 6 +- .../query/lookup/LookupReferencesManager.java | 121 ++++++++++++---- .../lookup/MapLookupExtractorFactory.java | 11 ++ .../LookupEnabledTestExprMacroTable.java | 11 ++ .../lookup/LookupReferencesManagerTest.java | 137 ++++++++++++++++-- .../RegisteredLookupExtractionFnTest.java | 10 ++ 25 files changed, 445 insertions(+), 44 deletions(-) diff --git a/docs/development/extensions-core/lookups-cached-global.md b/docs/development/extensions-core/lookups-cached-global.md index dc8827a5b36..13883c7aa79 100644 --- a/docs/development/extensions-core/lookups-cached-global.md +++ b/docs/development/extensions-core/lookups-cached-global.md @@ -353,6 +353,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol |`tsColumn`| The column in `table` which contains when the key was updated|No|Not used| |`pollPeriod`|How often to poll the DB|No|0 (only once)| |`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0| +|`loadTimeoutSeconds`| How much time (in seconds) it can take to query and populate lookup values. It will be helpful in lookup updates. On lookup update, it will wait maximum of `loadTimeoutSeconds` for new lookup to come up and continue serving from old lookup until new lookup successfully loads. |No|0| |`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size| ```json diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java index 6d5d393a0bf..049f33f40c8 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -297,6 +297,18 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory return new KafkaLookupExtractorIntrospectionHandler(this); } + @Override + public void awaitInitialization() + { + // Kafka lookup do not need await on initialization as it is realtime kafka lookups. + } + + @Override + public boolean isInitialized() + { + return true; + } + @Override public LookupExtractor get() { diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java index 6c21990bb52..24a7b481b22 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactoryTest.java @@ -83,11 +83,13 @@ public class KafkaLookupExtractorFactoryTest mapper.writeValueAsString(expected), KafkaLookupExtractorFactory.class ); + result.awaitInitialization(); Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic()); Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties()); Assert.assertEquals(cacheManager, result.getCacheManager()); Assert.assertEquals(0, expected.getCompletedEventCount()); Assert.assertEquals(0, result.getCompletedEventCount()); + Assert.assertTrue(result.isInitialized()); } @Test diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java index a89727f643a..ce3121679ad 100644 --- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java +++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactory.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -169,6 +170,22 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory return lookupIntrospectHandler; } + @Override + public void awaitInitialization() throws InterruptedException, TimeoutException + { + long timeout = extractionNamespace.getLoadTimeoutMills(); + if (entry.getCacheState() == CacheScheduler.NoCache.CACHE_NOT_INITIALIZED) { + LOG.info("Cache not initialized yet for namespace %s waiting for %s mills", extractionNamespace, timeout); + entry.awaitTotalUpdatesWithTimeout(1, timeout); + } + } + + @Override + public boolean isInitialized() + { + return entry.getCacheState() instanceof CacheScheduler.VersionedCache; + } + @JsonProperty public ExtractionNamespace getExtractionNamespace() { diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java index c52021bd18f..ba910d3755d 100644 --- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/ExtractionNamespace.java @@ -49,4 +49,9 @@ public interface ExtractionNamespace { return 0; } + + default long getLoadTimeoutMills() + { + return 60 * 1000; + } } diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java index 32ceccd1a82..0c1cd427e54 100644 --- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespace.java @@ -45,6 +45,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace private static final Logger LOG = new Logger(JdbcExtractionNamespace.class); long DEFAULT_MAX_HEAP_PERCENTAGE = 10L; + long DEFAULT_LOOKUP_LOAD_TIME_SECONDS = 120; @JsonProperty private final MetadataStorageConnectorConfig connectorConfig; @@ -63,6 +64,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace @JsonProperty private final long maxHeapPercentage; @JsonProperty + private final long loadTimeoutSeconds; + @JsonProperty private final int jitterSeconds; @JsonCreator @@ -77,6 +80,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace @Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod, @JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage, @JsonProperty(value = "jitterSeconds") @Nullable Integer jitterSeconds, + @JsonProperty(value = "loadTimeoutSeconds") @Nullable final Long loadTimeoutSeconds, @JacksonInject JdbcAccessSecurityConfig securityConfig ) { @@ -101,6 +105,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace } this.jitterSeconds = jitterSeconds == null ? 0 : jitterSeconds; this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage; + this.loadTimeoutSeconds = loadTimeoutSeconds == null ? DEFAULT_LOOKUP_LOAD_TIME_SECONDS : loadTimeoutSeconds; } /** @@ -176,6 +181,12 @@ public class JdbcExtractionNamespace implements ExtractionNamespace return 1000L * ThreadLocalRandom.current().nextInt(jitterSeconds + 1); } + @Override + public long getLoadTimeoutMills() + { + return 1000L * loadTimeoutSeconds; + } + @Override public String toString() { @@ -187,6 +198,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace ", tsColumn='" + tsColumn + '\'' + ", filter='" + filter + '\'' + ", pollPeriod=" + pollPeriod + + ", jitterSeconds=" + jitterSeconds + + ", loadTimeoutSeconds=" + loadTimeoutSeconds + ", maxHeapPercentage=" + maxHeapPercentage + '}'; } diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java index 61e580563f8..30fa710a4b5 100644 --- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java +++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/server/lookup/namespace/cache/CacheScheduler.java @@ -115,6 +115,12 @@ public final class CacheScheduler impl.updateCounter.awaitCount(totalUpdates); } + @VisibleForTesting + public void awaitTotalUpdatesWithTimeout(int totalUpdates, long timeoutMills) + throws InterruptedException, TimeoutException + { + impl.updateCounter.awaitCount(totalUpdates, timeoutMills, TimeUnit.MILLISECONDS); + } @VisibleForTesting void awaitNextUpdates(int nextUpdates) throws InterruptedException { diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java index 3515ebe0623..1aa4ef1dfe4 100644 --- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java @@ -59,6 +59,7 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -227,7 +228,6 @@ public class NamespaceLookupExtractorFactoryTest ); Assert.assertTrue(namespaceLookupExtractorFactory.start()); Assert.assertTrue(namespaceLookupExtractorFactory.start()); - verify(scheduler).scheduleAndWait(extractionNamespace, 60000L); verifyNoMoreInteractions(scheduler, entry, versionedCache); } @@ -287,6 +287,40 @@ public class NamespaceLookupExtractorFactoryTest verifyNoMoreInteractions(scheduler, entry, versionedCache); } + @Test + public void testAwaitInitializationOnCacheNotInitialized() throws Exception + { + final ExtractionNamespace extractionNamespace = new ExtractionNamespace() + { + @Override + public long getPollMs() + { + return 0; + } + + @Override + public long getLoadTimeoutMills() + { + return 1; + } + }; + expectScheduleAndWaitOnce(extractionNamespace); + when(entry.getCacheState()).thenReturn(CacheScheduler.NoCache.CACHE_NOT_INITIALIZED); + + final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory( + extractionNamespace, + scheduler + ); + Assert.assertTrue(namespaceLookupExtractorFactory.start()); + namespaceLookupExtractorFactory.awaitInitialization(); + Assert.assertThrows(ISE.class, () -> namespaceLookupExtractorFactory.get()); + verify(scheduler).scheduleAndWait(extractionNamespace, 60000L); + verify(entry, times(2)).getCacheState(); + verify(entry).awaitTotalUpdatesWithTimeout(1, 1); + Thread.sleep(10); + verifyNoMoreInteractions(scheduler, entry, versionedCache); + } + private void expectScheduleAndWaitOnce(ExtractionNamespace extractionNamespace) { try { diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java index f4fffef5fff..178abca9c49 100644 --- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/namespace/JdbcExtractionNamespaceUrlCheckTest.java @@ -63,7 +63,9 @@ public class JdbcExtractionNamespaceUrlCheckTest "some filter", new Period(10), null, - 0, new JdbcAccessSecurityConfig() + 0, + 1000L, + new JdbcAccessSecurityConfig() { @Override public Set getAllowedProperties() @@ -102,6 +104,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -139,6 +142,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -178,6 +182,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -221,6 +226,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -260,6 +266,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), 10L, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -296,7 +303,9 @@ public class JdbcExtractionNamespaceUrlCheckTest "some filter", new Period(10), null, - 0, new JdbcAccessSecurityConfig() + 0, + null, + new JdbcAccessSecurityConfig() { @Override public Set getAllowedProperties() @@ -335,6 +344,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -380,6 +390,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override @@ -423,6 +434,7 @@ public class JdbcExtractionNamespaceUrlCheckTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() { @Override diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java index 1eb74630fda..7162eac0a2d 100644 --- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/JdbcCacheGeneratorTest.java @@ -138,6 +138,7 @@ public class JdbcCacheGeneratorTest Period.ZERO, null, 0, + null, new JdbcAccessSecurityConfig() ); } diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java index fd96529ae99..fb4d6070e3e 100644 --- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/CacheSchedulerTest.java @@ -191,6 +191,24 @@ public class CacheSchedulerTest Assert.assertEquals(VALUE, entry.getCache().get(KEY)); } + @Test(timeout = 60_000L) + public void testInitialization() throws InterruptedException, TimeoutException + { + UriExtractionNamespace namespace = new UriExtractionNamespace( + tmpFile.toURI(), + null, null, + new UriExtractionNamespace.ObjectMapperFlatDataParser( + UriExtractionNamespaceTest.registerTypes(new ObjectMapper()) + ), + new Period(0), + null, + null + ); + CacheScheduler.Entry entry = scheduler.schedule(namespace); + entry.awaitTotalUpdatesWithTimeout(1, 2000); + Assert.assertEquals(VALUE, entry.getCache().get(KEY)); + } + @Test(timeout = 60_000L) public void testPeriodicUpdatesScheduled() throws InterruptedException { @@ -459,6 +477,7 @@ public class CacheSchedulerTest new Period(10_000), null, 0, + null, new JdbcAccessSecurityConfig() { @Override diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java index e0c651724d7..cf71a5da490 100644 --- a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java @@ -329,6 +329,7 @@ public class JdbcExtractionNamespaceTest new Period(0), null, 0, + null, new JdbcAccessSecurityConfig() ); try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { @@ -363,6 +364,7 @@ public class JdbcExtractionNamespaceTest new Period(0), null, 0, + null, new JdbcAccessSecurityConfig() ); try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { @@ -414,6 +416,7 @@ public class JdbcExtractionNamespaceTest new Period(0), null, 120, + null, new JdbcAccessSecurityConfig() ); long jitter = extractionNamespace.getJitterMills(); @@ -433,6 +436,7 @@ public class JdbcExtractionNamespaceTest FILTER_COLUMN + "='1'", new Period(0), null, + 0, null, new JdbcAccessSecurityConfig() ); @@ -478,6 +482,7 @@ public class JdbcExtractionNamespaceTest new Period(10), null, 0, + null, securityConfig ); final ObjectMapper mapper = new DefaultObjectMapper(); @@ -504,6 +509,7 @@ public class JdbcExtractionNamespaceTest new Period(10), null, 0, + null, new JdbcAccessSecurityConfig() ); CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace); diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java index 5746c695239..f7267c7dbfb 100644 --- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java +++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/LoadingLookupFactory.java @@ -111,6 +111,17 @@ public class LoadingLookupFactory implements LookupExtractorFactory return null; } + @Override + public void awaitInitialization() + { + // LoadingLookupFactory does not have any initialization period as it fetches the key from loadingCache and DataFetcher as necessary. + } + + @Override + public boolean isInitialized() + { + return true; + } @Override public LoadingLookup get() { diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java index 48cdc47f6cc..63ab7356be8 100644 --- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java +++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookupFactory.java @@ -128,6 +128,17 @@ public class PollingLookupFactory implements LookupExtractorFactory return null; } + @Override + public void awaitInitialization() + { + } + + @Override + public boolean isInitialized() + { + return true; + } + @Override public PollingLookup get() { diff --git a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java index be6acd56139..cf67e6e32ea 100644 --- a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java +++ b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/LoadingLookupFactoryTest.java @@ -57,6 +57,8 @@ public class LoadingLookupFactoryTest EasyMock.expectLastCall().once(); EasyMock.replay(loadingLookup); Assert.assertTrue(loadingLookupFactory.start()); + loadingLookupFactory.awaitInitialization(); + Assert.assertTrue(loadingLookupFactory.isInitialized()); Assert.assertTrue(loadingLookupFactory.close()); EasyMock.verify(loadingLookup); diff --git a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java index 39bfc41a619..510839ffc3f 100644 --- a/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java +++ b/extensions-core/lookups-cached-single/src/test/java/org/apache/druid/server/lookup/PollingLookupFactoryTest.java @@ -36,6 +36,8 @@ public class PollingLookupFactoryTest EasyMock.expect(pollingLookup.isOpen()).andReturn(true).once(); EasyMock.replay(pollingLookup); Assert.assertTrue(pollingLookupFactory.start()); + pollingLookupFactory.awaitInitialization(); + Assert.assertTrue(pollingLookupFactory.isInitialized()); EasyMock.verify(pollingLookup); } diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java index 8d4ee42c5bb..80e1189ae6a 100644 --- a/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupExtractorFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Supplier; import javax.annotation.Nullable; +import java.util.concurrent.TimeoutException; /** * Users of Lookup Extraction need to implement a {@link LookupExtractorFactory} supplier of type {@link LookupExtractor}. @@ -79,4 +80,14 @@ public interface LookupExtractorFactory extends Supplier */ @Nullable LookupIntrospectHandler getIntrospectHandler(); + + /** + * awaitToInitialise blocks and wait for the cache to initialize fully. + */ + void awaitInitialization() throws InterruptedException, TimeoutException; + + /** + * @return true if cache is loaded and lookup is queryable else returns false + */ + boolean isInitialized(); } diff --git a/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java b/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java index 500093fbd36..727fdf52dd3 100644 --- a/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java +++ b/processing/src/test/java/org/apache/druid/query/lookup/LookupExtractorFactoryContainerTest.java @@ -104,6 +104,17 @@ public class LookupExtractorFactoryContainerTest return null; } + @Override + public void awaitInitialization() + { + } + + @Override + public boolean isInitialized() + { + return true; + } + @Override public LookupExtractor get() { diff --git a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java index cc5630b9331..2d36ffa1e63 100644 --- a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java @@ -82,6 +82,17 @@ public class LookupSegmentTest throw new UnsupportedOperationException("not needed for this test"); } + @Override + public void awaitInitialization() + { + } + + @Override + public boolean isInitialized() + { + return true; + } + @Override public LookupExtractor get() { diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java index 01f7d684831..53072b96b10 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java @@ -94,7 +94,9 @@ class LookupListeningResource extends ListenerResource try { state.getToLoad().forEach(manager::add); - state.getToDrop().forEach(manager::remove); + state.getToDrop().forEach(lookName -> { + manager.remove(lookName, state.getToLoad().getOrDefault(lookName, null)); + }); return Response.status(Response.Status.ACCEPTED).entity(manager.getAllLookupsState()).build(); } @@ -135,7 +137,7 @@ class LookupListeningResource extends ListenerResource @Override public Object delete(String id) { - manager.remove(id); + manager.remove(id, null); return id; } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java index 3cdaec0d4a3..03879b0eaea 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java @@ -66,6 +66,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.Function; @@ -117,6 +118,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP private final LookupConfig lookupConfig; + private ExecutorService lookupUpdateExecutorService; + @Inject public LookupReferencesManager( LookupConfig lookupConfig, @@ -147,6 +150,10 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig; this.lookupConfig = lookupConfig; this.testMode = testMode; + this.lookupUpdateExecutorService = Execs.multiThreaded( + lookupConfig.getNumLookupLoadingThreads(), + "LookupExtractorFactoryContainerProvider-Update-%s" + ); } @LifecycleStart @@ -217,7 +224,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP Map lookupMap = new HashMap<>(swappedState.lookupMap); for (Notice notice : swappedState.noticesBeingHandled) { try { - notice.handle(lookupMap); + notice.handle(lookupMap, this); } catch (Exception ex) { LOG.error(ex, "Exception occurred while handling lookup notice [%s].", notice); @@ -266,7 +273,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP LOG.error(ex, "Failed to close lookup [%s].", e.getKey()); } } - + lookupUpdateExecutorService.shutdown(); LOG.debug("LookupExtractorFactoryContainerProvider is stopped."); } @@ -277,10 +284,10 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer, lookupConfig.getLookupStartRetries())); } - public void remove(String lookupName) + public void remove(String lookupName, LookupExtractorFactoryContainer loadedContainer) { Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); - addNotice(new DropNotice(lookupName)); + addNotice(new DropNotice(lookupName, loadedContainer)); } private void addNotice(Notice notice) @@ -301,6 +308,11 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP LockSupport.unpark(mainThread); } + public void submitAsyncLookupTask(Runnable task) + { + lookupUpdateExecutorService.submit(task); + } + @Override public Optional get(String lookupName) { @@ -595,11 +607,24 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP ) ); } + private void dropContainer(LookupExtractorFactoryContainer container, String lookupName) + { + if (container != null) { + LOG.debug("Removed lookup [%s] with spec [%s].", lookupName, container); + if (!container.getLookupExtractorFactory().destroy()) { + throw new ISE( + "destroy method returned false for lookup [%s]:[%s]", + lookupName, + container + ); + } + } + } @VisibleForTesting interface Notice { - void handle(Map lookupMap) throws Exception; + void handle(Map lookupMap, LookupReferencesManager manager) throws Exception; } private static class LoadNotice implements Notice @@ -616,7 +641,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP } @Override - public void handle(Map lookupMap) throws Exception + public void handle(Map lookupMap, LookupReferencesManager manager) + throws Exception { LookupExtractorFactoryContainer old = lookupMap.get(lookupName); if (old != null && !lookupExtractorFactoryContainer.replaces(old)) { @@ -642,18 +668,45 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP e -> true, startRetries ); - - old = lookupMap.put(lookupName, lookupExtractorFactoryContainer); - - LOG.debug("Loaded lookup [%s] with spec [%s].", lookupName, lookupExtractorFactoryContainer); - - if (old != null) { - if (!old.getLookupExtractorFactory().destroy()) { - throw new ISE("destroy method returned false for lookup [%s]:[%s]", lookupName, old); - } + if (lookupExtractorFactoryContainer.getLookupExtractorFactory().isInitialized()) { + old = lookupMap.put(lookupName, lookupExtractorFactoryContainer); + LOG.debug("Loaded lookup [%s] with spec [%s].", lookupName, lookupExtractorFactoryContainer); + manager.dropContainer(old, lookupName); + return; } + manager.submitAsyncLookupTask(() -> { + try { + /* + Retry startRetries times and wait for first cache to load for new container, + if loaded then kill old container and start serving from new one. + If new lookupExtractorFactoryContainer has errors in loading, kill the new container and do not remove the old container + */ + RetryUtils.retry( + () -> { + lookupExtractorFactoryContainer.getLookupExtractorFactory().awaitInitialization(); + return null; + }, e -> true, + startRetries + ); + if (lookupExtractorFactoryContainer.getLookupExtractorFactory().isInitialized()) { + // send load notice with cache loaded container + manager.add(lookupName, lookupExtractorFactoryContainer); + } else { + // skip loading new container as it is failed after 3 attempts + manager.dropContainer(lookupExtractorFactoryContainer, lookupName); + } + } + catch (Exception e) { + // drop new failed container and continue serving old one + LOG.error( + e, + "Exception in updating the namespace %s, continue serving from old container and killing new container ", + lookupExtractorFactoryContainer + ); + manager.dropContainer(lookupExtractorFactoryContainer, lookupName); + } + }); } - @Override public String toString() { @@ -667,28 +720,36 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP private static class DropNotice implements Notice { private final String lookupName; + private final LookupExtractorFactoryContainer loadedContainer; - DropNotice(String lookupName) + /** + * @param lookupName Name of the lookup to drop + * @param loadedContainer Container ref to newly loaded container, this is mandatory in the update lookup call, it can be null in purely drop call. + */ + DropNotice(String lookupName, @Nullable LookupExtractorFactoryContainer loadedContainer) { this.lookupName = lookupName; + this.loadedContainer = loadedContainer; } @Override - public void handle(Map lookupMap) + public void handle(Map lookupMap, LookupReferencesManager manager) { - final LookupExtractorFactoryContainer lookupExtractorFactoryContainer = lookupMap.remove(lookupName); - - if (lookupExtractorFactoryContainer != null) { - LOG.debug("Removed lookup [%s] with spec [%s].", lookupName, lookupExtractorFactoryContainer); - - if (!lookupExtractorFactoryContainer.getLookupExtractorFactory().destroy()) { - throw new ISE( - "destroy method returned false for lookup [%s]:[%s]", - lookupName, - lookupExtractorFactoryContainer - ); - } + if (loadedContainer != null && !loadedContainer.getLookupExtractorFactory().isInitialized()) { + final LookupExtractorFactoryContainer containterToDrop = lookupMap.get(lookupName); + manager.submitAsyncLookupTask(() -> { + try { + loadedContainer.getLookupExtractorFactory().awaitInitialization(); + manager.dropContainer(containterToDrop, lookupName); + } + catch (InterruptedException | TimeoutException e) { + // do nothing as loadedContainer is dropped by LoadNotice handler eventually if cache is not loaded + } + }); + return; } + final LookupExtractorFactoryContainer lookupExtractorFactoryContainer = lookupMap.remove(lookupName); + manager.dropContainer(lookupExtractorFactoryContainer, lookupName); } @Override diff --git a/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java b/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java index 3f39e149ba3..74f68b20a5d 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java +++ b/server/src/main/java/org/apache/druid/query/lookup/MapLookupExtractorFactory.java @@ -87,6 +87,17 @@ public class MapLookupExtractorFactory implements LookupExtractorFactory return lookupIntrospectHandler; } + @Override + public void awaitInitialization() + { + + } + + @Override + public boolean isInitialized() + { + return true; + } @Override public LookupExtractor get() { diff --git a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java index 07f225d7eef..c5c399e8b4c 100644 --- a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java +++ b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java @@ -98,6 +98,17 @@ public class LookupEnabledTestExprMacroTable extends ExprMacroTable throw new UnsupportedOperationException(); } + @Override + public void awaitInitialization() + { + } + + @Override + public boolean isInitialized() + { + return true; + } + @Override public LookupExtractor get() { diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java index 8cc6956d117..855b761ec16 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class LookupReferencesManagerTest { @@ -149,7 +150,7 @@ public class LookupReferencesManagerTest @Test(expected = IllegalStateException.class) public void testRemoveExceptionWhenClosed() { - lookupReferencesManager.remove("test"); + lookupReferencesManager.remove("test", null); } @Test(expected = IllegalStateException.class) @@ -164,6 +165,7 @@ public class LookupReferencesManagerTest LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes(); EasyMock.replay(lookupExtractorFactory); Map lookupMap = new HashMap<>(); @@ -193,18 +195,132 @@ public class LookupReferencesManagerTest Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test")); - lookupReferencesManager.remove("test"); + lookupReferencesManager.remove("test", testContainer); lookupReferencesManager.handlePendingNotices(); Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test")); } + @Test + public void testLoadBadContaineAfterOldGoodContainer() throws Exception + { + // Test the scenario of not loading the new container until it get intialized + LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); + EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes(); + EasyMock.replay(lookupExtractorFactory); + + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForAddGetRemove", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); + EasyMock.replay(config); + EasyMock.expect(druidLeaderClient.makeRequest( + HttpMethod.GET, + "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true" + )) + .andReturn(request); + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + newEmptyResponse(HttpResponseStatus.OK), + StandardCharsets.UTF_8 + ).addChunk(strResult); + EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); + EasyMock.replay(druidLeaderClient); + lookupReferencesManager.start(); + Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test")); + + LookupExtractorFactoryContainer testContainer = new LookupExtractorFactoryContainer("0", lookupExtractorFactory); + + lookupReferencesManager.add("test", testContainer); + lookupReferencesManager.handlePendingNotices(); + + Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test")); + + LookupExtractorFactory badLookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); + EasyMock.expect(badLookupExtractorFactory.start()).andReturn(false).anyTimes(); + badLookupExtractorFactory.awaitInitialization(); + EasyMock.expectLastCall().andThrow(new TimeoutException()); + EasyMock.expect(badLookupExtractorFactory.destroy()).andReturn(true).once(); + EasyMock.expect(badLookupExtractorFactory.isInitialized()).andReturn(false).anyTimes(); + EasyMock.replay(badLookupExtractorFactory); + LookupExtractorFactoryContainer badContainer = new LookupExtractorFactoryContainer("0", badLookupExtractorFactory); + lookupReferencesManager.add("test", badContainer); + + lookupReferencesManager.handlePendingNotices(); + + Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test")); + + lookupReferencesManager.remove("test", testContainer); + lookupReferencesManager.handlePendingNotices(); + + Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test")); + } + + @Test + public void testDropOldContainerAfterNewLoadGoodContainer() throws Exception + { + // Test the scenario of dropping the current container only when new container gets initialized + LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); + EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes(); + EasyMock.replay(lookupExtractorFactory); + + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForAddGetRemove", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes(); + EasyMock.replay(config); + EasyMock.expect(druidLeaderClient.makeRequest( + HttpMethod.GET, + "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true" + )) + .andReturn(request); + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + newEmptyResponse(HttpResponseStatus.OK), + StandardCharsets.UTF_8 + ).addChunk(strResult); + EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); + EasyMock.replay(druidLeaderClient); + lookupReferencesManager.start(); + Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test")); + + LookupExtractorFactoryContainer testContainer = new LookupExtractorFactoryContainer("0", lookupExtractorFactory); + + lookupReferencesManager.add("test", testContainer); + lookupReferencesManager.handlePendingNotices(); + + Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test")); + + LookupExtractorFactory badLookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); + EasyMock.expect(badLookupExtractorFactory.start()).andReturn(false).anyTimes(); + badLookupExtractorFactory.awaitInitialization(); + EasyMock.expectLastCall().andThrow(new TimeoutException()); + EasyMock.expect(badLookupExtractorFactory.destroy()).andReturn(true).once(); + EasyMock.expect(badLookupExtractorFactory.isInitialized()).andReturn(false).anyTimes(); + EasyMock.replay(badLookupExtractorFactory); + LookupExtractorFactoryContainer badContainer = new LookupExtractorFactoryContainer("0", badLookupExtractorFactory); + lookupReferencesManager.remove("test", badContainer); // new container to load is badContainer here + + lookupReferencesManager.handlePendingNotices(); + + Assert.assertEquals(Optional.of(testContainer), lookupReferencesManager.get("test")); + + lookupReferencesManager.remove("test", testContainer); + lookupReferencesManager.handlePendingNotices(); + + Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test")); + } @Test public void testCloseIsCalledAfterStopping() throws Exception { - LookupExtractorFactory lookupExtractorFactory = EasyMock.createStrictMock(LookupExtractorFactory.class); + LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes(); EasyMock.replay(lookupExtractorFactory); Map lookupMap = new HashMap<>(); lookupMap.put("testMockForCloseIsCalledAfterStopping", container); @@ -234,7 +350,8 @@ public class LookupReferencesManagerTest @Test public void testDestroyIsCalledAfterRemove() throws Exception { - LookupExtractorFactory lookupExtractorFactory = EasyMock.createStrictMock(LookupExtractorFactory.class); + LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); + EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes(); EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once(); EasyMock.replay(lookupExtractorFactory); @@ -256,11 +373,12 @@ public class LookupReferencesManagerTest ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); EasyMock.replay(druidLeaderClient); + LookupExtractorFactoryContainer container = new LookupExtractorFactoryContainer("0", lookupExtractorFactory); lookupReferencesManager.start(); - lookupReferencesManager.add("testMock", new LookupExtractorFactoryContainer("0", lookupExtractorFactory)); + lookupReferencesManager.add("testMock", container); lookupReferencesManager.handlePendingNotices(); - lookupReferencesManager.remove("testMock"); + lookupReferencesManager.remove("testMock", container); lookupReferencesManager.handlePendingNotices(); EasyMock.verify(lookupExtractorFactory); @@ -385,7 +503,7 @@ public class LookupReferencesManagerTest EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); EasyMock.replay(druidLeaderClient); lookupReferencesManager.start(); - lookupReferencesManager.remove("test"); + lookupReferencesManager.remove("test", null); lookupReferencesManager.handlePendingNotices(); } @@ -480,7 +598,7 @@ public class LookupReferencesManagerTest lookupReferencesManager.add("one", container1); lookupReferencesManager.add("two", container2); lookupReferencesManager.handlePendingNotices(); - lookupReferencesManager.remove("one"); + lookupReferencesManager.remove("one", container1); lookupReferencesManager.add("three", container3); LookupsState state = lookupReferencesManager.getAllLookupsState(); @@ -526,6 +644,7 @@ public class LookupReferencesManagerTest LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once(); + EasyMock.expect(lookupExtractorFactory.isInitialized()).andReturn(true).anyTimes(); EasyMock.replay(lookupExtractorFactory); Assert.assertEquals(Optional.empty(), lookupReferencesManager.get("test")); @@ -541,7 +660,7 @@ public class LookupReferencesManagerTest lookupReferencesManager.getAllLookupNames() ); - lookupReferencesManager.remove("test"); + lookupReferencesManager.remove("test", null); while (lookupReferencesManager.get("test").isPresent()) { Thread.sleep(100); diff --git a/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java b/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java index 5e877e8288c..532c1213e26 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/RegisteredLookupExtractionFnTest.java @@ -282,6 +282,16 @@ public class RegisteredLookupExtractionFnTest return null; } + @Override + public void awaitInitialization() + { + } + + @Override + public boolean isInitialized() + { + return true; + } @Override public LookupExtractor get() {