From c07678b143663dde069569ab77c9803901d78a73 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 12 Oct 2017 21:22:24 -0500 Subject: [PATCH] Synchronization of lookups during startup of druid processes (#4758) * Changes for lookup synchronization * Refactor of Lookup classes * Minor refactors and doc update * Change coordinator instance to be retrieved by DruidLeaderClient * Wait before thread shutdown * Make disablelookups flag true by default * Update docs * Rename flag * Move executorservice shutdown to finally block * Update LookupConfig * Refactoring and doc changes * Remove lookup config constructor * Revert Lookupconfig constructor changes * Add tests to LookupConfig * Make executorservice local * Update LRM * Move ListeningScheduledExecutorService to ExecutorCompletionService * Move exception to outer block * Remove check to see future is done * Remove unnecessary assignment * Add logging --- docs/content/querying/lookups.md | 2 + extensions-core/histogram/pom.xml | 7 + .../io/druid/indexing/common/TestUtils.java | 4 +- .../druid/query/dimension/DimensionSpec.java | 3 +- .../druid/query/extraction/ExtractionFn.java | 2 - .../io/druid/query/lookup/LookupConfig.java | 29 +- .../query/expression/TestExprMacroTable.java | 59 --- .../druid/query/lookup/LookupConfigTest.java | 27 +- .../query/dimension/LookupDimensionSpec.java | 0 .../query/expression/LookupExprMacro.java | 0 .../io/druid/query/lookup/LookupModule.java | 8 +- .../query/lookup/LookupReferencesManager.java | 289 +++++++++++--- .../lookup/RegisteredLookupExtractionFn.java | 0 .../http/LookupCoordinatorResource.java | 10 +- .../dimension/LookupDimensionSpecTest.java | 2 +- .../druid/query/expression/ExprMacroTest.java | 2 +- .../expression/TestExpressionMacroTable.java | 106 ++++++ .../lookup/LookupReferencesManagerTest.java | 356 ++++++++++++++++-- .../RegisteredLookupExtractionFnTest.java | 0 .../http/LookupCoordinatorResourceTest.java | 34 +- sql/pom.xml | 7 + .../druid/sql/calcite/util/CalciteTests.java | 19 +- 22 files changed, 787 insertions(+), 179 deletions(-) rename {processing => server}/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java (100%) rename {processing => server}/src/main/java/io/druid/query/expression/LookupExprMacro.java (100%) rename {processing => server}/src/main/java/io/druid/query/lookup/LookupReferencesManager.java (59%) rename {processing => server}/src/main/java/io/druid/query/lookup/RegisteredLookupExtractionFn.java (100%) rename {processing => server}/src/test/java/io/druid/query/expression/ExprMacroTest.java (98%) create mode 100644 server/src/test/java/io/druid/query/expression/TestExpressionMacroTable.java rename {processing => server}/src/test/java/io/druid/query/lookup/RegisteredLookupExtractionFnTest.java (100%) diff --git a/docs/content/querying/lookups.md b/docs/content/querying/lookups.md index 23ef47f6549..96ffd7f48cd 100644 --- a/docs/content/querying/lookups.md +++ b/docs/content/querying/lookups.md @@ -323,6 +323,8 @@ It is possible to save the configuration across restarts such that a node will n |Property|Description|Default| |--------|-----------|-------| |`druid.lookup.snapshotWorkingDir`| Working path used to store snapshot of current lookup configuration, leaving this property null will disable snapshot/bootstrap utility|null| +|`druid.lookup.numLookupLoadingThreads`| Number of threads for loading the lookups in parallel on startup. This thread pool is destroyed once startup is done. It is not kept during the lifetime of the JVM|Available Processors / 2| +|`druid.lookup.enableLookupSyncOnStartup`|Enable the lookup synchronization process with coordinator on startup. The queryable nodes will fetch and load the lookups from the coordinator instead of waiting for the coordinator to load the lookups for them. Users may opt to disable this option if there are no lookups configured in the cluster.|true| ## Introspect a Lookup diff --git a/extensions-core/histogram/pom.xml b/extensions-core/histogram/pom.xml index a00d8140aaf..a1a714809b9 100644 --- a/extensions-core/histogram/pom.xml +++ b/extensions-core/histogram/pom.xml @@ -59,6 +59,13 @@ test-jar test + + io.druid + druid-server + ${project.parent.version} + test + test-jar + junit junit diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java index 013e91b479f..e34ea20fbc2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java @@ -29,7 +29,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.math.expr.ExprMacroTable; -import io.druid.query.expression.TestExprMacroTable; +import io.druid.query.expression.TestExpressionMacroTable; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.column.ColumnConfig; @@ -74,7 +74,7 @@ public class TestUtils jsonMapper.setInjectableValues( new InjectableValues.Std() - .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ExprMacroTable.class.getName(), TestExpressionMacroTable.INSTANCE) .addValue(IndexIO.class, indexIO) .addValue(ObjectMapper.class, jsonMapper) .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider()) diff --git a/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java index 7a98ffd5edb..33f4de0fdbf 100644 --- a/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/DimensionSpec.java @@ -35,8 +35,7 @@ import io.druid.segment.column.ValueType; @JsonSubTypes.Type(name = "default", value = DefaultDimensionSpec.class), @JsonSubTypes.Type(name = "extraction", value = ExtractionDimensionSpec.class), @JsonSubTypes.Type(name = "regexFiltered", value = RegexFilteredDimensionSpec.class), - @JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class), - @JsonSubTypes.Type(name = "lookup", value = LookupDimensionSpec.class) + @JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class) }) public interface DimensionSpec extends Cacheable { diff --git a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java index df6c86c4867..5e6a3b95c0a 100644 --- a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java +++ b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.guice.annotations.ExtensionPoint; import io.druid.java.util.common.Cacheable; import io.druid.query.lookup.LookupExtractionFn; -import io.druid.query.lookup.RegisteredLookupExtractionFn; import javax.annotation.Nullable; @@ -41,7 +40,6 @@ import javax.annotation.Nullable; @JsonSubTypes.Type(name = "timeFormat", value = TimeFormatExtractionFn.class), @JsonSubTypes.Type(name = "identity", value = IdentityExtractionFn.class), @JsonSubTypes.Type(name = "lookup", value = LookupExtractionFn.class), - @JsonSubTypes.Type(name = "registeredLookup", value = RegisteredLookupExtractionFn.class), @JsonSubTypes.Type(name = "substring", value = SubstringDimExtractionFn.class), @JsonSubTypes.Type(name = "cascade", value = CascadeExtractionFn.class), @JsonSubTypes.Type(name = "stringFormat", value = StringFormatExtractionFn.class), diff --git a/processing/src/main/java/io/druid/query/lookup/LookupConfig.java b/processing/src/main/java/io/druid/query/lookup/LookupConfig.java index fea346b745f..7f424c1e987 100644 --- a/processing/src/main/java/io/druid/query/lookup/LookupConfig.java +++ b/processing/src/main/java/io/druid/query/lookup/LookupConfig.java @@ -26,11 +26,19 @@ import com.google.common.base.Strings; public class LookupConfig { - @JsonProperty - private final String snapshotWorkingDir; + @JsonProperty("snapshotWorkingDir") + private String snapshotWorkingDir; + + @JsonProperty("enableLookupSyncOnStartup") + private boolean enableLookupSyncOnStartup = true; + + @JsonProperty("numLookupLoadingThreads") + private int numLookupLoadingThreads = Runtime.getRuntime().availableProcessors() / 2; /** - * @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility + * @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility + * @param numLookupLoadingThreads number of threads for loading the lookups as part of the synchronization process + * @param enableLookupSyncOnStartup decides whether the lookup synchronization process should be enabled at startup */ @JsonCreator public LookupConfig( @@ -45,6 +53,15 @@ public class LookupConfig return snapshotWorkingDir; } + public int getNumLookupLoadingThreads() + { + return numLookupLoadingThreads; + } + + public boolean getEnableLookupSyncOnStartup() + { + return enableLookupSyncOnStartup; + } @Override public boolean equals(Object o) @@ -58,7 +75,9 @@ public class LookupConfig LookupConfig that = (LookupConfig) o; - return getSnapshotWorkingDir().equals(that.getSnapshotWorkingDir()); + return snapshotWorkingDir.equals(that.snapshotWorkingDir) && + enableLookupSyncOnStartup == that.enableLookupSyncOnStartup && + numLookupLoadingThreads == that.numLookupLoadingThreads; } @@ -67,6 +86,8 @@ public class LookupConfig { return "LookupConfig{" + "snapshotWorkingDir='" + getSnapshotWorkingDir() + '\'' + + " numLookupLoadingThreads='" + getNumLookupLoadingThreads() + '\'' + + " enableLookupSyncOnStartup='" + getEnableLookupSyncOnStartup() + '\'' + '}'; } } diff --git a/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java b/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java index b1319cbe014..8527b65b839 100644 --- a/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java +++ b/processing/src/test/java/io/druid/query/expression/TestExprMacroTable.java @@ -20,17 +20,7 @@ package io.druid.query.expression; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.druid.math.expr.ExprMacroTable; -import io.druid.query.extraction.MapLookupExtractor; -import io.druid.query.lookup.LookupExtractor; -import io.druid.query.lookup.LookupExtractorFactory; -import io.druid.query.lookup.LookupExtractorFactoryContainer; -import io.druid.query.lookup.LookupIntrospectHandler; -import io.druid.query.lookup.LookupReferencesManager; -import org.easymock.EasyMock; - -import javax.annotation.Nullable; public class TestExprMacroTable extends ExprMacroTable { @@ -41,7 +31,6 @@ public class TestExprMacroTable extends ExprMacroTable super( ImmutableList.of( new LikeExprMacro(), - new LookupExprMacro(createTestLookupReferencesManager(ImmutableMap.of("foo", "xfoo"))), new RegexpExtractExprMacro(), new TimestampCeilExprMacro(), new TimestampExtractExprMacro(), @@ -55,52 +44,4 @@ public class TestExprMacroTable extends ExprMacroTable ) ); } - - /** - * Returns a mock {@link LookupReferencesManager} that has one lookup, "lookyloo". - */ - public static LookupReferencesManager createTestLookupReferencesManager(final ImmutableMap theLookup) - { - final LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class); - EasyMock.expect(lookupReferencesManager.get(EasyMock.eq("lookyloo"))).andReturn( - new LookupExtractorFactoryContainer( - "v0", - new LookupExtractorFactory() - { - @Override - public boolean start() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean close() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean replaces(@Nullable final LookupExtractorFactory other) - { - throw new UnsupportedOperationException(); - } - - @Override - public LookupIntrospectHandler getIntrospectHandler() - { - throw new UnsupportedOperationException(); - } - - @Override - public LookupExtractor get() - { - return new MapLookupExtractor(theLookup, false); - } - } - ) - ).anyTimes(); - EasyMock.expect(lookupReferencesManager.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes(); - EasyMock.replay(lookupReferencesManager); - return lookupReferencesManager; - } } diff --git a/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java b/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java index 836176e5913..c2e39bf0a0b 100644 --- a/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java +++ b/processing/src/test/java/io/druid/query/lookup/LookupConfigTest.java @@ -32,13 +32,36 @@ public class LookupConfigTest { ObjectMapper mapper = TestHelper.getJsonMapper(); - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void TestSerDesr() throws IOException { LookupConfig lookupConfig = new LookupConfig(temporaryFolder.newFile().getAbsolutePath()); - Assert.assertEquals(lookupConfig, mapper.reader(LookupConfig.class).readValue(mapper.writeValueAsString(lookupConfig))); + Assert.assertEquals( + lookupConfig, + mapper.reader(LookupConfig.class).readValue(mapper.writeValueAsString(lookupConfig)) + ); + } + + @Test + public void testSerdeWithNonDefaults() throws Exception + { + String json = "{\n" + + " \"enableLookupSyncOnStartup\": false,\n" + + " \"snapshotWorkingDir\": \"/tmp\",\n" + + " \"numLookupLoadingThreads\": 4 \n" + + "}\n"; + LookupConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue(json, LookupConfig.class) + ), + LookupConfig.class + ); + + Assert.assertEquals("/tmp", config.getSnapshotWorkingDir()); + Assert.assertEquals(false, config.getEnableLookupSyncOnStartup()); + Assert.assertEquals(4, config.getNumLookupLoadingThreads()); } } diff --git a/processing/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java b/server/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java similarity index 100% rename from processing/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java rename to server/src/main/java/io/druid/query/dimension/LookupDimensionSpec.java diff --git a/processing/src/main/java/io/druid/query/expression/LookupExprMacro.java b/server/src/main/java/io/druid/query/expression/LookupExprMacro.java similarity index 100% rename from processing/src/main/java/io/druid/query/expression/LookupExprMacro.java rename to server/src/main/java/io/druid/query/expression/LookupExprMacro.java diff --git a/server/src/main/java/io/druid/query/lookup/LookupModule.java b/server/src/main/java/io/druid/query/lookup/LookupModule.java index c8fdeaf68c0..59c0d7f96c3 100644 --- a/server/src/main/java/io/druid/query/lookup/LookupModule.java +++ b/server/src/main/java/io/druid/query/lookup/LookupModule.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -48,6 +49,7 @@ import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Smile; import io.druid.initialization.DruidModule; import io.druid.java.util.common.logger.Logger; +import io.druid.query.dimension.LookupDimensionSpec; import io.druid.query.expression.LookupExprMacro; import io.druid.server.DruidNode; import io.druid.server.http.HostAndPortWithScheme; @@ -84,7 +86,11 @@ public class LookupModule implements DruidModule public List getJacksonModules() { return ImmutableList.of( - new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class) + new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class), + new SimpleModule().registerSubtypes( + new NamedType(LookupDimensionSpec.class, "lookup"), + new NamedType(RegisteredLookupExtractionFn.class, "registeredLookup") + ) ); } diff --git a/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java similarity index 59% rename from processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java rename to server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java index 1485fe725ca..0badc265020 100644 --- a/processing/src/main/java/io/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/io/druid/query/lookup/LookupReferencesManager.java @@ -20,6 +20,7 @@ package io.druid.query.lookup; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -28,21 +29,34 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.client.coordinator.Coordinator; import io.druid.concurrent.Execs; import io.druid.concurrent.LifecycleLock; +import io.druid.discovery.DruidLeaderClient; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Json; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import javax.annotation.Nullable; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -60,6 +74,11 @@ public class LookupReferencesManager { private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class); + private static final TypeReference> LOOKUPS_ALL_REFERENCE = + new TypeReference>() + { + }; + // Lookups state (loaded/to-be-loaded/to-be-dropped etc) is managed by immutable LookupUpdateState instance. // Any update to state is done by creating updated LookupUpdateState instance and atomically setting that // into the ref here. @@ -79,21 +98,43 @@ public class LookupReferencesManager //for unit testing only private final boolean testMode; + private final DruidLeaderClient druidLeaderClient; + + private final ObjectMapper jsonMapper; + + private final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig; + + private final LookupConfig lookupConfig; + @Inject - public LookupReferencesManager(LookupConfig lookupConfig, @Json ObjectMapper objectMapper) + public LookupReferencesManager( + LookupConfig lookupConfig, + @Json ObjectMapper objectMapper, + @Coordinator DruidLeaderClient druidLeaderClient, + LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig + ) { - this(lookupConfig, objectMapper, false); + this(lookupConfig, objectMapper, druidLeaderClient, lookupListeningAnnouncerConfig, false); } @VisibleForTesting - LookupReferencesManager(LookupConfig lookupConfig, ObjectMapper objectMapper, boolean testMode) + LookupReferencesManager( + LookupConfig lookupConfig, + ObjectMapper objectMapper, + DruidLeaderClient druidLeaderClient, + LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig, + boolean testMode + ) { if (Strings.isNullOrEmpty(lookupConfig.getSnapshotWorkingDir())) { this.lookupSnapshotTaker = null; } else { this.lookupSnapshotTaker = new LookupSnapshotTaker(objectMapper, lookupConfig.getSnapshotWorkingDir()); } - + this.druidLeaderClient = druidLeaderClient; + this.jsonMapper = objectMapper; + this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig; + this.lookupConfig = lookupConfig; this.testMode = testMode; } @@ -103,43 +144,34 @@ public class LookupReferencesManager if (!lifecycleLock.canStart()) { throw new ISE("can't start."); } - try { LOG.info("LookupReferencesManager is starting."); - - loadSnapshotAndInitStateRef(); - + loadAllLookupsAndInitStateRef(); if (!testMode) { mainThread = Execs.makeThread( "LookupReferencesManager-MainThread", - new Runnable() - { - @Override - public void run() - { - try { + () -> { + try { + if (!lifecycleLock.awaitStarted()) { + LOG.error("WTF! lifecycle not started, lookup update notices will not be handled."); + return; + } - if (!lifecycleLock.awaitStarted()) { - LOG.error("WTF! lifecycle not started, lookup update notices will not be handled."); - return; + while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { + handlePendingNotices(); + LockSupport.parkNanos(LookupReferencesManager.this, TimeUnit.MINUTES.toNanos(1)); } - - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - try { - handlePendingNotices(); - LockSupport.parkNanos(LookupReferencesManager.this, TimeUnit.MINUTES.toNanos(1)); - } - catch (Throwable t) { - LOG.makeAlert(t, "Error occured while lookup notice handling.").emit(); - } + catch (Throwable t) { + LOG.makeAlert(t, "Error occured while lookup notice handling.").emit(); } } - catch (Throwable t) { - LOG.error(t, "Error while waiting for lifecycle start. lookup updates notices will not be handled"); - } - finally { - LOG.info("Lookup Management loop exited, Lookup notices are not handled anymore."); - } + } + catch (Throwable t) { + LOG.error(t, "Error while waiting for lifecycle start. lookup updates notices will not be handled"); + } + finally { + LOG.info("Lookup Management loop exited, Lookup notices are not handled anymore."); } }, true @@ -279,7 +311,11 @@ public class LookupReferencesManager return new LookupsState<>(lookupUpdateState.lookupMap, lookupsToLoad, lookupsToDrop); } - private void updateToLoadAndDrop(List notices, Map lookupsToLoad, Set lookupsToDrop) + private void updateToLoadAndDrop( + List notices, + Map lookupsToLoad, + Set lookupsToDrop + ) { for (Notice notice : notices) { if (notice instanceof LoadNotice) { @@ -299,34 +335,168 @@ public class LookupReferencesManager private void takeSnapshot(Map lookupMap) { if (lookupSnapshotTaker != null) { - List lookups = new ArrayList<>(lookupMap.size()); - for (Map.Entry e : lookupMap.entrySet()) { - lookups.add(new LookupBean(e.getKey(), null, e.getValue())); - } - - lookupSnapshotTaker.takeSnapshot(lookups); + lookupSnapshotTaker.takeSnapshot(getLookupBeanList(lookupMap)); } } - private void loadSnapshotAndInitStateRef() + private void loadAllLookupsAndInitStateRef() { - if (lookupSnapshotTaker != null) { - ImmutableMap.Builder builder = ImmutableMap.builder(); + List lookupBeanList = getLookupsListFromLookupConfig(); + if (lookupBeanList != null) { + startLookups(lookupBeanList); + } else { + LOG.info("No lookups to be loaded at this point"); + stateRef.set(new LookupUpdateState(ImmutableMap.of(), ImmutableList.of(), ImmutableList.of())); + } + } - final List lookupBeanList = lookupSnapshotTaker.pullExistingSnapshot(); - for (LookupBean lookupBean : lookupBeanList) { - LookupExtractorFactoryContainer container = lookupBean.getContainer(); - - if (container.getLookupExtractorFactory().start()) { - builder.put(lookupBean.getName(), container); - } else { - throw new ISE("Failed to start lookup [%s]:[%s]", lookupBean.getName(), container); - } + /** + * Returns a list of lookups from the coordinator if the coordinator is available. If it's not available, returns null. + * + * @param tier lookup tier name + * + * @return list of LookupBean objects, or null + */ + @Nullable + private List getLookupListFromCoordinator(String tier) + { + try { + final FullResponseHolder response = fetchLookupsForTier(tier); + List lookupBeanList = new ArrayList<>(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + LOG.error( + "Error while fetching lookup code from Coordinator with status[%s] and content[%s]", + response.getStatus(), + response.getContent() + ); + return null; } - stateRef.set(new LookupUpdateState(builder.build(), ImmutableList.of(), ImmutableList.of())); + // Older version of getSpecificTier returns a list of lookup names. + // Lookup loading is performed via snapshot if older version is present. + // This check is only for backward compatibility and should be removed in a future release + if (response.getContent().startsWith("[")) { + LOG.info("Failed to retrieve lookup information from coordinator. Attempting to load lookups using snapshot instead"); + return null; + } else { + Map lookupMap = jsonMapper.readValue( + response.getContent(), + LOOKUPS_ALL_REFERENCE + ); + lookupMap.forEach((k, v) -> lookupBeanList.add(new LookupBean(k, null, v))); + + } + return lookupBeanList; + } + catch (Exception e) { + LOG.error(e, "Error while trying to get lookup list from coordinator for tier[%s]", tier); + return null; + } + } + + /** + * Returns a list of lookups from the snapshot if the lookupsnapshottaker is configured. If it's not available, returns null. + * + * @return list of LookupBean objects, or null + */ + @Nullable + private List getLookupListFromSnapshot() + { + if (lookupSnapshotTaker != null) { + return lookupSnapshotTaker.pullExistingSnapshot(); + } + return null; + } + + private List getLookupBeanList(Map lookupMap) + { + List lookups = new ArrayList<>(lookupMap.size()); + for (Map.Entry e : lookupMap.entrySet()) { + lookups.add(new LookupBean(e.getKey(), null, e.getValue())); + } + return lookups; + } + + private List getLookupsListFromLookupConfig() + { + List lookupBeanList; + if (lookupConfig.getEnableLookupSyncOnStartup()) { + String tier = lookupListeningAnnouncerConfig.getLookupTier(); + lookupBeanList = getLookupListFromCoordinator(tier); + if (lookupBeanList == null) { + LOG.info("Coordinator is unavailable. Loading saved snapshot instead"); + lookupBeanList = getLookupListFromSnapshot(); + } } else { - stateRef.set(new LookupUpdateState(ImmutableMap.of(), ImmutableList.of(), ImmutableList.of())); + lookupBeanList = getLookupListFromSnapshot(); + } + return lookupBeanList; + } + + private void startLookups(List lookupBeanList) + { + ImmutableMap.Builder builder = ImmutableMap.builder(); + ExecutorService executorService = Execs.multiThreaded( + lookupConfig.getNumLookupLoadingThreads(), + "LookupReferencesManager-Startup-%s" + ); + ExecutorCompletionService completionService = new ExecutorCompletionService(executorService); + List> futures = new ArrayList<>(); + try { + LOG.info("Starting lookup loading process"); + for (LookupBean lookupBean : lookupBeanList) { + futures.add( + completionService.submit( + () -> { + LookupExtractorFactoryContainer container = lookupBean.getContainer(); + LOG.info( + "Starting lookup [%s]:[%s]", + lookupBean.getName(), + container + ); + if (container.getLookupExtractorFactory().start()) { + LOG.info( + "Started lookup [%s]:[%s]", + lookupBean.getName(), + container + ); + return new AbstractMap.SimpleImmutableEntry<>(lookupBean.getName(), container); + } else { + LOG.error( + "Failed to start lookup [%s]:[%s]", + lookupBean.getName(), + container + ); + return null; + } + } + ) + ); + } + for (int i = 0; i < futures.size(); i++) { + try { + final Future> completedFuture = completionService + .take(); + final AbstractMap.SimpleImmutableEntry lookupResult = completedFuture + .get(); + if (lookupResult != null) { + builder.put(lookupResult); + } + } + catch (ExecutionException e) { + LOG.error(e, "Execution error during lookup loading."); + } + } + stateRef.set(new LookupUpdateState(builder.build(), ImmutableList.of(), ImmutableList.of())); + } + catch (Exception e) { + LOG.error(e, "Failed to finish lookup load process."); + for (Future future : futures) { + future.cancel(true); + } + } + finally { + executorService.shutdownNow(); } } @@ -341,6 +511,19 @@ public class LookupReferencesManager } } + private FullResponseHolder fetchLookupsForTier(String tier) + throws ExecutionException, InterruptedException, MalformedURLException, IOException + { + return druidLeaderClient.go( + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/lookups/%s?detailed=true", + tier + ) + )); + } + @VisibleForTesting interface Notice { diff --git a/processing/src/main/java/io/druid/query/lookup/RegisteredLookupExtractionFn.java b/server/src/main/java/io/druid/query/lookup/RegisteredLookupExtractionFn.java similarity index 100% rename from processing/src/main/java/io/druid/query/lookup/RegisteredLookupExtractionFn.java rename to server/src/main/java/io/druid/query/lookup/RegisteredLookupExtractionFn.java diff --git a/server/src/main/java/io/druid/server/http/LookupCoordinatorResource.java b/server/src/main/java/io/druid/server/http/LookupCoordinatorResource.java index 3f5ea82c9e9..93a9ce4fc9e 100644 --- a/server/src/main/java/io/druid/server/http/LookupCoordinatorResource.java +++ b/server/src/main/java/io/druid/server/http/LookupCoordinatorResource.java @@ -269,7 +269,9 @@ public class LookupCoordinatorResource @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) @Path("/{tier}") public Response getSpecificTier( - @PathParam("tier") String tier + @PathParam("tier") String tier, + @DefaultValue("false") @QueryParam("detailed") boolean detailed + ) { try { @@ -290,7 +292,11 @@ public class LookupCoordinatorResource .entity(ServletResourceUtils.sanitizeException(new RE("Tier [%s] not found", tier))) .build(); } - return Response.ok().entity(tierLookups.keySet()).build(); + if (detailed) { + return Response.ok().entity(tierLookups).build(); + } else { + return Response.ok().entity(tierLookups.keySet()).build(); + } } catch (Exception e) { LOG.error(e, "Error getting tier [%s]", tier); diff --git a/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java b/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java index b88ca449b38..7e1ef423f95 100644 --- a/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java +++ b/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java @@ -75,7 +75,7 @@ public class LookupDimensionSpecTest LOOKUP_REF_MANAGER ); String serLookup = mapper.writeValueAsString(lookupDimSpec); - Assert.assertEquals(lookupDimSpec, mapper.reader(DimensionSpec.class).with(injectableValues).readValue(serLookup)); + Assert.assertEquals(lookupDimSpec, mapper.reader(LookupDimensionSpec.class).with(injectableValues).readValue(serLookup)); } private Object[] parametersForTestSerDesr() diff --git a/processing/src/test/java/io/druid/query/expression/ExprMacroTest.java b/server/src/test/java/io/druid/query/expression/ExprMacroTest.java similarity index 98% rename from processing/src/test/java/io/druid/query/expression/ExprMacroTest.java rename to server/src/test/java/io/druid/query/expression/ExprMacroTest.java index 10b42157c95..90353a16801 100644 --- a/processing/src/test/java/io/druid/query/expression/ExprMacroTest.java +++ b/server/src/test/java/io/druid/query/expression/ExprMacroTest.java @@ -174,7 +174,7 @@ public class ExprMacroTest private void assertExpr(final String expression, final Object expectedResult) { - final Expr expr = Parser.parse(expression, TestExprMacroTable.INSTANCE); + final Expr expr = Parser.parse(expression, TestExpressionMacroTable.INSTANCE); Assert.assertEquals(expression, expectedResult, expr.eval(BINDINGS).value()); } } diff --git a/server/src/test/java/io/druid/query/expression/TestExpressionMacroTable.java b/server/src/test/java/io/druid/query/expression/TestExpressionMacroTable.java new file mode 100644 index 00000000000..ecc5fde256a --- /dev/null +++ b/server/src/test/java/io/druid/query/expression/TestExpressionMacroTable.java @@ -0,0 +1,106 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.expression; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.math.expr.ExprMacroTable; +import io.druid.query.extraction.MapLookupExtractor; +import io.druid.query.lookup.LookupExtractor; +import io.druid.query.lookup.LookupExtractorFactory; +import io.druid.query.lookup.LookupExtractorFactoryContainer; +import io.druid.query.lookup.LookupIntrospectHandler; +import io.druid.query.lookup.LookupReferencesManager; +import org.easymock.EasyMock; + +import javax.annotation.Nullable; + +public class TestExpressionMacroTable extends ExprMacroTable +{ + public static final ExprMacroTable INSTANCE = new TestExpressionMacroTable(); + + private TestExpressionMacroTable() + { + super( + ImmutableList.of( + new LikeExprMacro(), + new LookupExprMacro(createTestLookupReferencesManager(ImmutableMap.of("foo", "xfoo"))), + new RegexpExtractExprMacro(), + new TimestampCeilExprMacro(), + new TimestampExtractExprMacro(), + new TimestampFloorExprMacro(), + new TimestampFormatExprMacro(), + new TimestampParseExprMacro(), + new TimestampShiftExprMacro(), + new TrimExprMacro.BothTrimExprMacro(), + new TrimExprMacro.LeftTrimExprMacro(), + new TrimExprMacro.RightTrimExprMacro() + ) + ); + } + + /** + * Returns a mock {@link LookupReferencesManager} that has one lookup, "lookyloo". + */ + public static LookupReferencesManager createTestLookupReferencesManager(final ImmutableMap theLookup) + { + final LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class); + EasyMock.expect(lookupReferencesManager.get(EasyMock.eq("lookyloo"))).andReturn( + new LookupExtractorFactoryContainer( + "v0", + new LookupExtractorFactory() + { + @Override + public boolean start() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean close() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean replaces(@Nullable final LookupExtractorFactory other) + { + throw new UnsupportedOperationException(); + } + + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + throw new UnsupportedOperationException(); + } + + @Override + public LookupExtractor get() + { + return new MapLookupExtractor(theLookup, false); + } + } + ) + ).anyTimes(); + EasyMock.expect(lookupReferencesManager.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes(); + EasyMock.replay(lookupReferencesManager); + return lookupReferencesManager; + } +} diff --git a/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java index 8aaf8b5c86e..4395145b4ed 100644 --- a/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java +++ b/server/src/test/java/io/druid/query/lookup/LookupReferencesManagerTest.java @@ -22,9 +22,15 @@ package io.druid.query.lookup; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.discovery.DruidLeaderClient; import io.druid.jackson.DefaultObjectMapper; import io.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -32,11 +38,35 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; + public class LookupReferencesManagerTest { LookupReferencesManager lookupReferencesManager; + + private DruidLeaderClient druidLeaderClient; + + private LookupListeningAnnouncerConfig config; + + private static final String propertyBase = "some.property"; + + private static final String LOOKUP_TIER = "lookupTier"; + + private static final int LOOKUP_THREADS = 1; + + private static final boolean LOOKUP_DISABLE = false; + + LookupExtractorFactory lookupExtractorFactory; + + LookupExtractorFactoryContainer container; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); ObjectMapper mapper = new DefaultObjectMapper(); @@ -46,22 +76,49 @@ public class LookupReferencesManagerTest { EmittingLogger.registerEmitter(new NoopServiceEmitter()); + druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); + + config = createMock(LookupListeningAnnouncerConfig.class); + + lookupExtractorFactory = new MapLookupExtractorFactory( + ImmutableMap.of( + "key", + "value" + ), true + ); + container = new LookupExtractorFactoryContainer("v0", lookupExtractorFactory); mapper.registerSubtypes(MapLookupExtractorFactory.class); + String temporaryPath = temporaryFolder.newFolder().getAbsolutePath(); lookupReferencesManager = new LookupReferencesManager( new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()), - mapper, + mapper, druidLeaderClient, config, true ); } @Test - public void testStartStop() + public void testStartStop() throws InterruptedException, IOException { lookupReferencesManager = new LookupReferencesManager( new LookupConfig(null), - mapper + mapper, druidLeaderClient, config ); + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForStartStop", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); Assert.assertFalse(lookupReferencesManager.lifecycleLock.awaitStarted(1, TimeUnit.MICROSECONDS)); Assert.assertNull(lookupReferencesManager.mainThread); Assert.assertNull(lookupReferencesManager.stateRef.get()); @@ -107,6 +164,22 @@ public class LookupReferencesManagerTest EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once(); EasyMock.replay(lookupExtractorFactory); + + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForAddGetRemove", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); Assert.assertNull(lookupReferencesManager.get("test")); @@ -130,6 +203,21 @@ public class LookupReferencesManagerTest EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once(); EasyMock.replay(lookupExtractorFactory); + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForCloseIsCalledAfterStopping", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); lookupReferencesManager.add("testMock", new LookupExtractorFactoryContainer("0", lookupExtractorFactory)); lookupReferencesManager.handlePendingNotices(); @@ -146,6 +234,21 @@ public class LookupReferencesManagerTest EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once(); EasyMock.replay(lookupExtractorFactory); + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForCloseIsCalledAfterRemove", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); lookupReferencesManager.add("testMock", new LookupExtractorFactoryContainer("0", lookupExtractorFactory)); lookupReferencesManager.handlePendingNotices(); @@ -157,8 +260,23 @@ public class LookupReferencesManagerTest } @Test - public void testGetNotThere() + public void testGetNotThere() throws Exception { + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForGetNotThere", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); Assert.assertNull(lookupReferencesManager.get("notThere")); } @@ -174,7 +292,21 @@ public class LookupReferencesManagerTest EasyMock.expect(lookupExtractorFactory2.start()).andReturn(true).once(); EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2); - + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForUpdateWithHigherVersion", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); lookupReferencesManager.add("testName", new LookupExtractorFactoryContainer("1", lookupExtractorFactory1)); lookupReferencesManager.handlePendingNotices(); @@ -194,7 +326,21 @@ public class LookupReferencesManagerTest LookupExtractorFactory lookupExtractorFactory2 = EasyMock.createNiceMock(LookupExtractorFactory.class); EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2); - + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForUpdateWithLowerVersion", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); lookupReferencesManager.add("testName", new LookupExtractorFactoryContainer("1", lookupExtractorFactory1)); lookupReferencesManager.handlePendingNotices(); @@ -208,35 +354,26 @@ public class LookupReferencesManagerTest @Test public void testRemoveNonExisting() throws Exception { + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForRemoveNonExisting", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); lookupReferencesManager.remove("test"); lookupReferencesManager.handlePendingNotices(); } - @Test - public void testBootstrapFromFile() throws Exception - { - LookupExtractorFactory lookupExtractorFactory = new MapLookupExtractorFactory( - ImmutableMap.of( - "key", - "value" - ), true - ); - LookupExtractorFactoryContainer container = new LookupExtractorFactoryContainer("v0", lookupExtractorFactory); - lookupReferencesManager.start(); - lookupReferencesManager.add("testMockForBootstrap", container); - lookupReferencesManager.handlePendingNotices(); - lookupReferencesManager.stop(); - - lookupReferencesManager = new LookupReferencesManager( - new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile().getParent()), - mapper, - true - ); - lookupReferencesManager.start(); - Assert.assertEquals(container, lookupReferencesManager.get("testMockForBootstrap")); - } - @Test public void testGetAllLookupsState() throws Exception { @@ -269,7 +406,20 @@ public class LookupReferencesManagerTest ), true ) ); - + Map lookupMap = new HashMap<>(); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); lookupReferencesManager.add("one", container1); lookupReferencesManager.add("two", container2); @@ -290,14 +440,28 @@ public class LookupReferencesManagerTest Assert.assertTrue(state.getToDrop().contains("one")); } - @Test (timeout = 20000) + @Test(timeout = 20000) public void testRealModeWithMainThread() throws Exception { LookupReferencesManager lookupReferencesManager = new LookupReferencesManager( new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()), - mapper + mapper, druidLeaderClient, config ); - + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForRealModeWithMainThread", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); lookupReferencesManager.start(); Assert.assertTrue(lookupReferencesManager.mainThread.isAlive()); @@ -324,4 +488,128 @@ public class LookupReferencesManagerTest Assert.assertFalse(lookupReferencesManager.mainThread.isAlive()); } + + @Test + public void testCoordinatorLookupSync() throws Exception + { + LookupExtractorFactoryContainer container1 = new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory( + ImmutableMap.of( + "key1", + "value1" + ), true + ) + ); + + LookupExtractorFactoryContainer container2 = new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory( + ImmutableMap.of( + "key2", + "value2" + ), true + ) + ); + + LookupExtractorFactoryContainer container3 = new LookupExtractorFactoryContainer( + "0", + new MapLookupExtractorFactory( + ImmutableMap.of( + "key3", + "value3" + ), true + ) + ); + Map lookupMap = new HashMap<>(); + lookupMap.put("testLookup1", container1); + lookupMap.put("testLookup2", container2); + lookupMap.put("testLookup3", container3); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); + + lookupReferencesManager.start(); + Assert.assertEquals(container1, lookupReferencesManager.get("testLookup1")); + Assert.assertEquals(container2, lookupReferencesManager.get("testLookup2")); + Assert.assertEquals(container3, lookupReferencesManager.get("testLookup3")); + + } + + @Test + public void testLoadLookupOnCoordinatorFailure() throws Exception + { + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForLoadLookupOnCoordinatorFailure", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.NOT_FOUND, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException()); + replay(druidLeaderClient); + + lookupReferencesManager.start(); + lookupReferencesManager.add("testMockForLoadLookupOnCoordinatorFailure", container); + lookupReferencesManager.handlePendingNotices(); + lookupReferencesManager.stop(); + lookupReferencesManager = new LookupReferencesManager( + new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile().getParent()), + mapper, druidLeaderClient, config, + true + ); + reset(config); + reset(druidLeaderClient); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException()); + replay(druidLeaderClient); + lookupReferencesManager.start(); + Assert.assertEquals(container, lookupReferencesManager.get("testMockForLoadLookupOnCoordinatorFailure")); + } + + @Test + public void testDisableLookupSync() throws Exception + { + LookupReferencesManager lookupReferencesManager = new LookupReferencesManager( + new LookupConfig(null), + mapper, druidLeaderClient, config + ); + Map lookupMap = new HashMap<>(); + lookupMap.put("testMockForDisableLookupSync", container); + String strResult = mapper.writeValueAsString(lookupMap); + Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx")); + expect(config.getLookupTier()).andReturn(LOOKUP_TIER); + replay(config); + expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true")) + .andReturn(request); + FullResponseHolder responseHolder = new FullResponseHolder( + HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), + new StringBuilder().append(strResult) + ); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + + lookupReferencesManager.start(); + Assert.assertNull(lookupReferencesManager.get("testMockForDisableLookupSync")); + } + } diff --git a/processing/src/test/java/io/druid/query/lookup/RegisteredLookupExtractionFnTest.java b/server/src/test/java/io/druid/query/lookup/RegisteredLookupExtractionFnTest.java similarity index 100% rename from processing/src/test/java/io/druid/query/lookup/RegisteredLookupExtractionFnTest.java rename to server/src/test/java/io/druid/query/lookup/RegisteredLookupExtractionFnTest.java diff --git a/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java b/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java index 499c236303b..563542e86ad 100644 --- a/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java +++ b/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java @@ -86,9 +86,10 @@ public class LookupCoordinatorResourceTest ImmutableMap.of(LOOKUP_NAME, SINGLE_LOOKUP), null, null ); - private static final Map> NODES_LOOKUP_STATE = ImmutableMap.of( - LOOKUP_NODE, LOOKUP_STATE - ); + private static final Map> NODES_LOOKUP_STATE = ImmutableMap + .of( + LOOKUP_NODE, LOOKUP_STATE + ); @Test public void testSimpleGet() @@ -208,6 +209,23 @@ public class LookupCoordinatorResourceTest EasyMock.verify(lookupCoordinatorManager); } + @Test + public void testDetailedGetLookup() + { + final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class); + EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(SINGLE_TIER_MAP).once(); + EasyMock.replay(lookupCoordinatorManager); + final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource( + lookupCoordinatorManager, + mapper, + mapper + ); + final Response response = lookupCoordinatorResource.getSpecificTier(LOOKUP_TIER, true); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(SINGLE_TIER_MAP.get(LOOKUP_TIER), response.getEntity()); + EasyMock.verify(lookupCoordinatorManager); + } + @Test public void testMissingGetLookup() { @@ -765,7 +783,7 @@ public class LookupCoordinatorResourceTest mapper, mapper ); - final Response response = lookupCoordinatorResource.getSpecificTier(LOOKUP_TIER); + final Response response = lookupCoordinatorResource.getSpecificTier(LOOKUP_TIER, false); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(SINGLE_TIER_MAP.get(LOOKUP_TIER).keySet(), response.getEntity()); EasyMock.verify(lookupCoordinatorManager); @@ -785,7 +803,7 @@ public class LookupCoordinatorResourceTest mapper, mapper ); - final Response response = lookupCoordinatorResource.getSpecificTier(tier); + final Response response = lookupCoordinatorResource.getSpecificTier(tier, false); Assert.assertEquals(404, response.getStatus()); EasyMock.verify(lookupCoordinatorManager); } @@ -801,7 +819,7 @@ public class LookupCoordinatorResourceTest mapper, mapper ); - final Response response = lookupCoordinatorResource.getSpecificTier(tier); + final Response response = lookupCoordinatorResource.getSpecificTier(tier, false); Assert.assertEquals(400, response.getStatus()); Assert.assertEquals(ImmutableMap.of("error", "`tier` required"), response.getEntity()); EasyMock.verify(lookupCoordinatorManager); @@ -819,7 +837,7 @@ public class LookupCoordinatorResourceTest mapper, mapper ); - final Response response = lookupCoordinatorResource.getSpecificTier(tier); + final Response response = lookupCoordinatorResource.getSpecificTier(tier, false); Assert.assertEquals(404, response.getStatus()); Assert.assertEquals(ImmutableMap.of("error", "No lookups found"), response.getEntity()); EasyMock.verify(lookupCoordinatorManager); @@ -838,7 +856,7 @@ public class LookupCoordinatorResourceTest mapper, mapper ); - final Response response = lookupCoordinatorResource.getSpecificTier(tier); + final Response response = lookupCoordinatorResource.getSpecificTier(tier, false); Assert.assertEquals(500, response.getStatus()); Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity()); EasyMock.verify(lookupCoordinatorManager); diff --git a/sql/pom.xml b/sql/pom.xml index 4ac4b2dbdc8..aed06eae5cf 100644 --- a/sql/pom.xml +++ b/sql/pom.xml @@ -83,6 +83,13 @@ test-jar test + + io.druid + druid-server + ${project.parent.version} + test-jar + test + diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 126d03e82af..57902d75deb 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -58,7 +58,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FloatSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.expression.LookupExprMacro; -import io.druid.query.expression.TestExprMacroTable; +import io.druid.query.expression.TestExpressionMacroTable; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryRunnerTest; @@ -138,13 +138,16 @@ public class CalciteTests // This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup. - LookupReferencesManager testLookupReferencesManager = TestExprMacroTable.createTestLookupReferencesManager( - ImmutableMap.of( - "a", "xa", - "abc", "xabc" - ) - ); - binder.bind(LookupReferencesManager.class).toInstance(testLookupReferencesManager); + binder.bind(LookupReferencesManager.class) + .toInstance( + TestExpressionMacroTable.createTestLookupReferencesManager( + ImmutableMap.of( + "a", "xa", + "abc", "xabc" + ) + ) + ); + } } );