mirror of https://github.com/apache/druid.git
Synchronization of lookups during startup of druid processes (#4758)
* Changes for lookup synchronization * Refactor of Lookup classes * Minor refactors and doc update * Change coordinator instance to be retrieved by DruidLeaderClient * Wait before thread shutdown * Make disablelookups flag true by default * Update docs * Rename flag * Move executorservice shutdown to finally block * Update LookupConfig * Refactoring and doc changes * Remove lookup config constructor * Revert Lookupconfig constructor changes * Add tests to LookupConfig * Make executorservice local * Update LRM * Move ListeningScheduledExecutorService to ExecutorCompletionService * Move exception to outer block * Remove check to see future is done * Remove unnecessary assignment * Add logging
This commit is contained in:
parent
928b083a7a
commit
c07678b143
|
@ -323,6 +323,8 @@ It is possible to save the configuration across restarts such that a node will n
|
|||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.lookup.snapshotWorkingDir`| Working path used to store snapshot of current lookup configuration, leaving this property null will disable snapshot/bootstrap utility|null|
|
||||
|`druid.lookup.numLookupLoadingThreads`| Number of threads for loading the lookups in parallel on startup. This thread pool is destroyed once startup is done. It is not kept during the lifetime of the JVM|Available Processors / 2|
|
||||
|`druid.lookup.enableLookupSyncOnStartup`|Enable the lookup synchronization process with coordinator on startup. The queryable nodes will fetch and load the lookups from the coordinator instead of waiting for the coordinator to load the lookups for them. Users may opt to disable this option if there are no lookups configured in the cluster.|true|
|
||||
|
||||
## Introspect a Lookup
|
||||
|
||||
|
|
|
@ -59,6 +59,13 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
|
|
@ -29,7 +29,7 @@ import io.druid.jackson.DefaultObjectMapper;
|
|||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.math.expr.ExprMacroTable;
|
||||
import io.druid.query.expression.TestExprMacroTable;
|
||||
import io.druid.query.expression.TestExpressionMacroTable;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.IndexMergerV9;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
|
@ -74,7 +74,7 @@ public class TestUtils
|
|||
|
||||
jsonMapper.setInjectableValues(
|
||||
new InjectableValues.Std()
|
||||
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
|
||||
.addValue(ExprMacroTable.class.getName(), TestExpressionMacroTable.INSTANCE)
|
||||
.addValue(IndexIO.class, indexIO)
|
||||
.addValue(ObjectMapper.class, jsonMapper)
|
||||
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
|
||||
|
|
|
@ -35,8 +35,7 @@ import io.druid.segment.column.ValueType;
|
|||
@JsonSubTypes.Type(name = "default", value = DefaultDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "extraction", value = ExtractionDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "regexFiltered", value = RegexFilteredDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "lookup", value = LookupDimensionSpec.class)
|
||||
@JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class)
|
||||
})
|
||||
public interface DimensionSpec extends Cacheable
|
||||
{
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|||
import io.druid.guice.annotations.ExtensionPoint;
|
||||
import io.druid.java.util.common.Cacheable;
|
||||
import io.druid.query.lookup.LookupExtractionFn;
|
||||
import io.druid.query.lookup.RegisteredLookupExtractionFn;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
|
@ -41,7 +40,6 @@ import javax.annotation.Nullable;
|
|||
@JsonSubTypes.Type(name = "timeFormat", value = TimeFormatExtractionFn.class),
|
||||
@JsonSubTypes.Type(name = "identity", value = IdentityExtractionFn.class),
|
||||
@JsonSubTypes.Type(name = "lookup", value = LookupExtractionFn.class),
|
||||
@JsonSubTypes.Type(name = "registeredLookup", value = RegisteredLookupExtractionFn.class),
|
||||
@JsonSubTypes.Type(name = "substring", value = SubstringDimExtractionFn.class),
|
||||
@JsonSubTypes.Type(name = "cascade", value = CascadeExtractionFn.class),
|
||||
@JsonSubTypes.Type(name = "stringFormat", value = StringFormatExtractionFn.class),
|
||||
|
|
|
@ -26,11 +26,19 @@ import com.google.common.base.Strings;
|
|||
public class LookupConfig
|
||||
{
|
||||
|
||||
@JsonProperty
|
||||
private final String snapshotWorkingDir;
|
||||
@JsonProperty("snapshotWorkingDir")
|
||||
private String snapshotWorkingDir;
|
||||
|
||||
@JsonProperty("enableLookupSyncOnStartup")
|
||||
private boolean enableLookupSyncOnStartup = true;
|
||||
|
||||
@JsonProperty("numLookupLoadingThreads")
|
||||
private int numLookupLoadingThreads = Runtime.getRuntime().availableProcessors() / 2;
|
||||
|
||||
/**
|
||||
* @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility
|
||||
* @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility
|
||||
* @param numLookupLoadingThreads number of threads for loading the lookups as part of the synchronization process
|
||||
* @param enableLookupSyncOnStartup decides whether the lookup synchronization process should be enabled at startup
|
||||
*/
|
||||
@JsonCreator
|
||||
public LookupConfig(
|
||||
|
@ -45,6 +53,15 @@ public class LookupConfig
|
|||
return snapshotWorkingDir;
|
||||
}
|
||||
|
||||
public int getNumLookupLoadingThreads()
|
||||
{
|
||||
return numLookupLoadingThreads;
|
||||
}
|
||||
|
||||
public boolean getEnableLookupSyncOnStartup()
|
||||
{
|
||||
return enableLookupSyncOnStartup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
|
@ -58,7 +75,9 @@ public class LookupConfig
|
|||
|
||||
LookupConfig that = (LookupConfig) o;
|
||||
|
||||
return getSnapshotWorkingDir().equals(that.getSnapshotWorkingDir());
|
||||
return snapshotWorkingDir.equals(that.snapshotWorkingDir) &&
|
||||
enableLookupSyncOnStartup == that.enableLookupSyncOnStartup &&
|
||||
numLookupLoadingThreads == that.numLookupLoadingThreads;
|
||||
|
||||
}
|
||||
|
||||
|
@ -67,6 +86,8 @@ public class LookupConfig
|
|||
{
|
||||
return "LookupConfig{" +
|
||||
"snapshotWorkingDir='" + getSnapshotWorkingDir() + '\'' +
|
||||
" numLookupLoadingThreads='" + getNumLookupLoadingThreads() + '\'' +
|
||||
" enableLookupSyncOnStartup='" + getEnableLookupSyncOnStartup() + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,17 +20,7 @@
|
|||
package io.druid.query.expression;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.math.expr.ExprMacroTable;
|
||||
import io.druid.query.extraction.MapLookupExtractor;
|
||||
import io.druid.query.lookup.LookupExtractor;
|
||||
import io.druid.query.lookup.LookupExtractorFactory;
|
||||
import io.druid.query.lookup.LookupExtractorFactoryContainer;
|
||||
import io.druid.query.lookup.LookupIntrospectHandler;
|
||||
import io.druid.query.lookup.LookupReferencesManager;
|
||||
import org.easymock.EasyMock;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class TestExprMacroTable extends ExprMacroTable
|
||||
{
|
||||
|
@ -41,7 +31,6 @@ public class TestExprMacroTable extends ExprMacroTable
|
|||
super(
|
||||
ImmutableList.of(
|
||||
new LikeExprMacro(),
|
||||
new LookupExprMacro(createTestLookupReferencesManager(ImmutableMap.of("foo", "xfoo"))),
|
||||
new RegexpExtractExprMacro(),
|
||||
new TimestampCeilExprMacro(),
|
||||
new TimestampExtractExprMacro(),
|
||||
|
@ -55,52 +44,4 @@ public class TestExprMacroTable extends ExprMacroTable
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a mock {@link LookupReferencesManager} that has one lookup, "lookyloo".
|
||||
*/
|
||||
public static LookupReferencesManager createTestLookupReferencesManager(final ImmutableMap<String, String> theLookup)
|
||||
{
|
||||
final LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class);
|
||||
EasyMock.expect(lookupReferencesManager.get(EasyMock.eq("lookyloo"))).andReturn(
|
||||
new LookupExtractorFactoryContainer(
|
||||
"v0",
|
||||
new LookupExtractorFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean start()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean close()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replaces(@Nullable final LookupExtractorFactory other)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupIntrospectHandler getIntrospectHandler()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
return new MapLookupExtractor(theLookup, false);
|
||||
}
|
||||
}
|
||||
)
|
||||
).anyTimes();
|
||||
EasyMock.expect(lookupReferencesManager.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes();
|
||||
EasyMock.replay(lookupReferencesManager);
|
||||
return lookupReferencesManager;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,13 +32,36 @@ public class LookupConfigTest
|
|||
{
|
||||
|
||||
ObjectMapper mapper = TestHelper.getJsonMapper();
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void TestSerDesr() throws IOException
|
||||
{
|
||||
LookupConfig lookupConfig = new LookupConfig(temporaryFolder.newFile().getAbsolutePath());
|
||||
Assert.assertEquals(lookupConfig, mapper.reader(LookupConfig.class).readValue(mapper.writeValueAsString(lookupConfig)));
|
||||
Assert.assertEquals(
|
||||
lookupConfig,
|
||||
mapper.reader(LookupConfig.class).readValue(mapper.writeValueAsString(lookupConfig))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeWithNonDefaults() throws Exception
|
||||
{
|
||||
String json = "{\n"
|
||||
+ " \"enableLookupSyncOnStartup\": false,\n"
|
||||
+ " \"snapshotWorkingDir\": \"/tmp\",\n"
|
||||
+ " \"numLookupLoadingThreads\": 4 \n"
|
||||
+ "}\n";
|
||||
LookupConfig config = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(json, LookupConfig.class)
|
||||
),
|
||||
LookupConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals("/tmp", config.getSnapshotWorkingDir());
|
||||
Assert.assertEquals(false, config.getEnableLookupSyncOnStartup());
|
||||
Assert.assertEquals(4, config.getNumLookupLoadingThreads());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
|
@ -48,6 +49,7 @@ import io.druid.guice.annotations.Self;
|
|||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.dimension.LookupDimensionSpec;
|
||||
import io.druid.query.expression.LookupExprMacro;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.server.http.HostAndPortWithScheme;
|
||||
|
@ -84,7 +86,11 @@ public class LookupModule implements DruidModule
|
|||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return ImmutableList.<Module>of(
|
||||
new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class)
|
||||
new SimpleModule("DruidLookupModule").registerSubtypes(MapLookupExtractorFactory.class),
|
||||
new SimpleModule().registerSubtypes(
|
||||
new NamedType(LookupDimensionSpec.class, "lookup"),
|
||||
new NamedType(RegisteredLookupExtractionFn.class, "registeredLookup")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.lookup;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -28,21 +29,34 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.client.coordinator.Coordinator;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.concurrent.LifecycleLock;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
@ -60,6 +74,11 @@ public class LookupReferencesManager
|
|||
{
|
||||
private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class);
|
||||
|
||||
private static final TypeReference<Map<String, LookupExtractorFactoryContainer>> LOOKUPS_ALL_REFERENCE =
|
||||
new TypeReference<Map<String, LookupExtractorFactoryContainer>>()
|
||||
{
|
||||
};
|
||||
|
||||
// Lookups state (loaded/to-be-loaded/to-be-dropped etc) is managed by immutable LookupUpdateState instance.
|
||||
// Any update to state is done by creating updated LookupUpdateState instance and atomically setting that
|
||||
// into the ref here.
|
||||
|
@ -79,21 +98,43 @@ public class LookupReferencesManager
|
|||
//for unit testing only
|
||||
private final boolean testMode;
|
||||
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
private final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig;
|
||||
|
||||
private final LookupConfig lookupConfig;
|
||||
|
||||
@Inject
|
||||
public LookupReferencesManager(LookupConfig lookupConfig, @Json ObjectMapper objectMapper)
|
||||
public LookupReferencesManager(
|
||||
LookupConfig lookupConfig,
|
||||
@Json ObjectMapper objectMapper,
|
||||
@Coordinator DruidLeaderClient druidLeaderClient,
|
||||
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig
|
||||
)
|
||||
{
|
||||
this(lookupConfig, objectMapper, false);
|
||||
this(lookupConfig, objectMapper, druidLeaderClient, lookupListeningAnnouncerConfig, false);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
LookupReferencesManager(LookupConfig lookupConfig, ObjectMapper objectMapper, boolean testMode)
|
||||
LookupReferencesManager(
|
||||
LookupConfig lookupConfig,
|
||||
ObjectMapper objectMapper,
|
||||
DruidLeaderClient druidLeaderClient,
|
||||
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
|
||||
boolean testMode
|
||||
)
|
||||
{
|
||||
if (Strings.isNullOrEmpty(lookupConfig.getSnapshotWorkingDir())) {
|
||||
this.lookupSnapshotTaker = null;
|
||||
} else {
|
||||
this.lookupSnapshotTaker = new LookupSnapshotTaker(objectMapper, lookupConfig.getSnapshotWorkingDir());
|
||||
}
|
||||
|
||||
this.druidLeaderClient = druidLeaderClient;
|
||||
this.jsonMapper = objectMapper;
|
||||
this.lookupListeningAnnouncerConfig = lookupListeningAnnouncerConfig;
|
||||
this.lookupConfig = lookupConfig;
|
||||
this.testMode = testMode;
|
||||
}
|
||||
|
||||
|
@ -103,43 +144,34 @@ public class LookupReferencesManager
|
|||
if (!lifecycleLock.canStart()) {
|
||||
throw new ISE("can't start.");
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.info("LookupReferencesManager is starting.");
|
||||
|
||||
loadSnapshotAndInitStateRef();
|
||||
|
||||
loadAllLookupsAndInitStateRef();
|
||||
if (!testMode) {
|
||||
mainThread = Execs.makeThread(
|
||||
"LookupReferencesManager-MainThread",
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
() -> {
|
||||
try {
|
||||
if (!lifecycleLock.awaitStarted()) {
|
||||
LOG.error("WTF! lifecycle not started, lookup update notices will not be handled.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!lifecycleLock.awaitStarted()) {
|
||||
LOG.error("WTF! lifecycle not started, lookup update notices will not be handled.");
|
||||
return;
|
||||
while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
|
||||
try {
|
||||
handlePendingNotices();
|
||||
LockSupport.parkNanos(LookupReferencesManager.this, TimeUnit.MINUTES.toNanos(1));
|
||||
}
|
||||
|
||||
while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
|
||||
try {
|
||||
handlePendingNotices();
|
||||
LockSupport.parkNanos(LookupReferencesManager.this, TimeUnit.MINUTES.toNanos(1));
|
||||
}
|
||||
catch (Throwable t) {
|
||||
LOG.makeAlert(t, "Error occured while lookup notice handling.").emit();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
LOG.makeAlert(t, "Error occured while lookup notice handling.").emit();
|
||||
}
|
||||
}
|
||||
catch (Throwable t) {
|
||||
LOG.error(t, "Error while waiting for lifecycle start. lookup updates notices will not be handled");
|
||||
}
|
||||
finally {
|
||||
LOG.info("Lookup Management loop exited, Lookup notices are not handled anymore.");
|
||||
}
|
||||
}
|
||||
catch (Throwable t) {
|
||||
LOG.error(t, "Error while waiting for lifecycle start. lookup updates notices will not be handled");
|
||||
}
|
||||
finally {
|
||||
LOG.info("Lookup Management loop exited, Lookup notices are not handled anymore.");
|
||||
}
|
||||
},
|
||||
true
|
||||
|
@ -279,7 +311,11 @@ public class LookupReferencesManager
|
|||
return new LookupsState<>(lookupUpdateState.lookupMap, lookupsToLoad, lookupsToDrop);
|
||||
}
|
||||
|
||||
private void updateToLoadAndDrop(List<Notice> notices, Map<String, LookupExtractorFactoryContainer> lookupsToLoad, Set<String> lookupsToDrop)
|
||||
private void updateToLoadAndDrop(
|
||||
List<Notice> notices,
|
||||
Map<String, LookupExtractorFactoryContainer> lookupsToLoad,
|
||||
Set<String> lookupsToDrop
|
||||
)
|
||||
{
|
||||
for (Notice notice : notices) {
|
||||
if (notice instanceof LoadNotice) {
|
||||
|
@ -299,34 +335,168 @@ public class LookupReferencesManager
|
|||
private void takeSnapshot(Map<String, LookupExtractorFactoryContainer> lookupMap)
|
||||
{
|
||||
if (lookupSnapshotTaker != null) {
|
||||
List<LookupBean> lookups = new ArrayList<>(lookupMap.size());
|
||||
for (Map.Entry<String, LookupExtractorFactoryContainer> e : lookupMap.entrySet()) {
|
||||
lookups.add(new LookupBean(e.getKey(), null, e.getValue()));
|
||||
}
|
||||
|
||||
lookupSnapshotTaker.takeSnapshot(lookups);
|
||||
lookupSnapshotTaker.takeSnapshot(getLookupBeanList(lookupMap));
|
||||
}
|
||||
}
|
||||
|
||||
private void loadSnapshotAndInitStateRef()
|
||||
private void loadAllLookupsAndInitStateRef()
|
||||
{
|
||||
if (lookupSnapshotTaker != null) {
|
||||
ImmutableMap.Builder<String, LookupExtractorFactoryContainer> builder = ImmutableMap.builder();
|
||||
List<LookupBean> lookupBeanList = getLookupsListFromLookupConfig();
|
||||
if (lookupBeanList != null) {
|
||||
startLookups(lookupBeanList);
|
||||
} else {
|
||||
LOG.info("No lookups to be loaded at this point");
|
||||
stateRef.set(new LookupUpdateState(ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()));
|
||||
}
|
||||
}
|
||||
|
||||
final List<LookupBean> lookupBeanList = lookupSnapshotTaker.pullExistingSnapshot();
|
||||
for (LookupBean lookupBean : lookupBeanList) {
|
||||
LookupExtractorFactoryContainer container = lookupBean.getContainer();
|
||||
|
||||
if (container.getLookupExtractorFactory().start()) {
|
||||
builder.put(lookupBean.getName(), container);
|
||||
} else {
|
||||
throw new ISE("Failed to start lookup [%s]:[%s]", lookupBean.getName(), container);
|
||||
}
|
||||
/**
|
||||
* Returns a list of lookups from the coordinator if the coordinator is available. If it's not available, returns null.
|
||||
*
|
||||
* @param tier lookup tier name
|
||||
*
|
||||
* @return list of LookupBean objects, or null
|
||||
*/
|
||||
@Nullable
|
||||
private List<LookupBean> getLookupListFromCoordinator(String tier)
|
||||
{
|
||||
try {
|
||||
final FullResponseHolder response = fetchLookupsForTier(tier);
|
||||
List<LookupBean> lookupBeanList = new ArrayList<>();
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
LOG.error(
|
||||
"Error while fetching lookup code from Coordinator with status[%s] and content[%s]",
|
||||
response.getStatus(),
|
||||
response.getContent()
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
stateRef.set(new LookupUpdateState(builder.build(), ImmutableList.of(), ImmutableList.of()));
|
||||
// Older version of getSpecificTier returns a list of lookup names.
|
||||
// Lookup loading is performed via snapshot if older version is present.
|
||||
// This check is only for backward compatibility and should be removed in a future release
|
||||
if (response.getContent().startsWith("[")) {
|
||||
LOG.info("Failed to retrieve lookup information from coordinator. Attempting to load lookups using snapshot instead");
|
||||
return null;
|
||||
} else {
|
||||
Map<String, LookupExtractorFactoryContainer> lookupMap = jsonMapper.readValue(
|
||||
response.getContent(),
|
||||
LOOKUPS_ALL_REFERENCE
|
||||
);
|
||||
lookupMap.forEach((k, v) -> lookupBeanList.add(new LookupBean(k, null, v)));
|
||||
|
||||
}
|
||||
return lookupBeanList;
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "Error while trying to get lookup list from coordinator for tier[%s]", tier);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of lookups from the snapshot if the lookupsnapshottaker is configured. If it's not available, returns null.
|
||||
*
|
||||
* @return list of LookupBean objects, or null
|
||||
*/
|
||||
@Nullable
|
||||
private List<LookupBean> getLookupListFromSnapshot()
|
||||
{
|
||||
if (lookupSnapshotTaker != null) {
|
||||
return lookupSnapshotTaker.pullExistingSnapshot();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<LookupBean> getLookupBeanList(Map<String, LookupExtractorFactoryContainer> lookupMap)
|
||||
{
|
||||
List<LookupBean> lookups = new ArrayList<>(lookupMap.size());
|
||||
for (Map.Entry<String, LookupExtractorFactoryContainer> e : lookupMap.entrySet()) {
|
||||
lookups.add(new LookupBean(e.getKey(), null, e.getValue()));
|
||||
}
|
||||
return lookups;
|
||||
}
|
||||
|
||||
private List<LookupBean> getLookupsListFromLookupConfig()
|
||||
{
|
||||
List<LookupBean> lookupBeanList;
|
||||
if (lookupConfig.getEnableLookupSyncOnStartup()) {
|
||||
String tier = lookupListeningAnnouncerConfig.getLookupTier();
|
||||
lookupBeanList = getLookupListFromCoordinator(tier);
|
||||
if (lookupBeanList == null) {
|
||||
LOG.info("Coordinator is unavailable. Loading saved snapshot instead");
|
||||
lookupBeanList = getLookupListFromSnapshot();
|
||||
}
|
||||
} else {
|
||||
stateRef.set(new LookupUpdateState(ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()));
|
||||
lookupBeanList = getLookupListFromSnapshot();
|
||||
}
|
||||
return lookupBeanList;
|
||||
}
|
||||
|
||||
private void startLookups(List<LookupBean> lookupBeanList)
|
||||
{
|
||||
ImmutableMap.Builder<String, LookupExtractorFactoryContainer> builder = ImmutableMap.builder();
|
||||
ExecutorService executorService = Execs.multiThreaded(
|
||||
lookupConfig.getNumLookupLoadingThreads(),
|
||||
"LookupReferencesManager-Startup-%s"
|
||||
);
|
||||
ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
|
||||
List<Future<Map.Entry>> futures = new ArrayList<>();
|
||||
try {
|
||||
LOG.info("Starting lookup loading process");
|
||||
for (LookupBean lookupBean : lookupBeanList) {
|
||||
futures.add(
|
||||
completionService.submit(
|
||||
() -> {
|
||||
LookupExtractorFactoryContainer container = lookupBean.getContainer();
|
||||
LOG.info(
|
||||
"Starting lookup [%s]:[%s]",
|
||||
lookupBean.getName(),
|
||||
container
|
||||
);
|
||||
if (container.getLookupExtractorFactory().start()) {
|
||||
LOG.info(
|
||||
"Started lookup [%s]:[%s]",
|
||||
lookupBean.getName(),
|
||||
container
|
||||
);
|
||||
return new AbstractMap.SimpleImmutableEntry<>(lookupBean.getName(), container);
|
||||
} else {
|
||||
LOG.error(
|
||||
"Failed to start lookup [%s]:[%s]",
|
||||
lookupBean.getName(),
|
||||
container
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
for (int i = 0; i < futures.size(); i++) {
|
||||
try {
|
||||
final Future<AbstractMap.SimpleImmutableEntry<String, LookupExtractorFactoryContainer>> completedFuture = completionService
|
||||
.take();
|
||||
final AbstractMap.SimpleImmutableEntry<String, LookupExtractorFactoryContainer> lookupResult = completedFuture
|
||||
.get();
|
||||
if (lookupResult != null) {
|
||||
builder.put(lookupResult);
|
||||
}
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
LOG.error(e, "Execution error during lookup loading.");
|
||||
}
|
||||
}
|
||||
stateRef.set(new LookupUpdateState(builder.build(), ImmutableList.of(), ImmutableList.of()));
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "Failed to finish lookup load process.");
|
||||
for (Future future : futures) {
|
||||
future.cancel(true);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -341,6 +511,19 @@ public class LookupReferencesManager
|
|||
}
|
||||
}
|
||||
|
||||
private FullResponseHolder fetchLookupsForTier(String tier)
|
||||
throws ExecutionException, InterruptedException, MalformedURLException, IOException
|
||||
{
|
||||
return druidLeaderClient.go(
|
||||
druidLeaderClient.makeRequest(
|
||||
HttpMethod.GET,
|
||||
StringUtils.format(
|
||||
"/druid/coordinator/v1/lookups/%s?detailed=true",
|
||||
tier
|
||||
)
|
||||
));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
interface Notice
|
||||
{
|
|
@ -269,7 +269,9 @@ public class LookupCoordinatorResource
|
|||
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
|
||||
@Path("/{tier}")
|
||||
public Response getSpecificTier(
|
||||
@PathParam("tier") String tier
|
||||
@PathParam("tier") String tier,
|
||||
@DefaultValue("false") @QueryParam("detailed") boolean detailed
|
||||
|
||||
)
|
||||
{
|
||||
try {
|
||||
|
@ -290,7 +292,11 @@ public class LookupCoordinatorResource
|
|||
.entity(ServletResourceUtils.sanitizeException(new RE("Tier [%s] not found", tier)))
|
||||
.build();
|
||||
}
|
||||
return Response.ok().entity(tierLookups.keySet()).build();
|
||||
if (detailed) {
|
||||
return Response.ok().entity(tierLookups).build();
|
||||
} else {
|
||||
return Response.ok().entity(tierLookups.keySet()).build();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "Error getting tier [%s]", tier);
|
||||
|
|
|
@ -75,7 +75,7 @@ public class LookupDimensionSpecTest
|
|||
LOOKUP_REF_MANAGER
|
||||
);
|
||||
String serLookup = mapper.writeValueAsString(lookupDimSpec);
|
||||
Assert.assertEquals(lookupDimSpec, mapper.reader(DimensionSpec.class).with(injectableValues).readValue(serLookup));
|
||||
Assert.assertEquals(lookupDimSpec, mapper.reader(LookupDimensionSpec.class).with(injectableValues).readValue(serLookup));
|
||||
}
|
||||
|
||||
private Object[] parametersForTestSerDesr()
|
||||
|
|
|
@ -174,7 +174,7 @@ public class ExprMacroTest
|
|||
|
||||
private void assertExpr(final String expression, final Object expectedResult)
|
||||
{
|
||||
final Expr expr = Parser.parse(expression, TestExprMacroTable.INSTANCE);
|
||||
final Expr expr = Parser.parse(expression, TestExpressionMacroTable.INSTANCE);
|
||||
Assert.assertEquals(expression, expectedResult, expr.eval(BINDINGS).value());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.expression;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.math.expr.ExprMacroTable;
|
||||
import io.druid.query.extraction.MapLookupExtractor;
|
||||
import io.druid.query.lookup.LookupExtractor;
|
||||
import io.druid.query.lookup.LookupExtractorFactory;
|
||||
import io.druid.query.lookup.LookupExtractorFactoryContainer;
|
||||
import io.druid.query.lookup.LookupIntrospectHandler;
|
||||
import io.druid.query.lookup.LookupReferencesManager;
|
||||
import org.easymock.EasyMock;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class TestExpressionMacroTable extends ExprMacroTable
|
||||
{
|
||||
public static final ExprMacroTable INSTANCE = new TestExpressionMacroTable();
|
||||
|
||||
private TestExpressionMacroTable()
|
||||
{
|
||||
super(
|
||||
ImmutableList.of(
|
||||
new LikeExprMacro(),
|
||||
new LookupExprMacro(createTestLookupReferencesManager(ImmutableMap.of("foo", "xfoo"))),
|
||||
new RegexpExtractExprMacro(),
|
||||
new TimestampCeilExprMacro(),
|
||||
new TimestampExtractExprMacro(),
|
||||
new TimestampFloorExprMacro(),
|
||||
new TimestampFormatExprMacro(),
|
||||
new TimestampParseExprMacro(),
|
||||
new TimestampShiftExprMacro(),
|
||||
new TrimExprMacro.BothTrimExprMacro(),
|
||||
new TrimExprMacro.LeftTrimExprMacro(),
|
||||
new TrimExprMacro.RightTrimExprMacro()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a mock {@link LookupReferencesManager} that has one lookup, "lookyloo".
|
||||
*/
|
||||
public static LookupReferencesManager createTestLookupReferencesManager(final ImmutableMap<String, String> theLookup)
|
||||
{
|
||||
final LookupReferencesManager lookupReferencesManager = EasyMock.createMock(LookupReferencesManager.class);
|
||||
EasyMock.expect(lookupReferencesManager.get(EasyMock.eq("lookyloo"))).andReturn(
|
||||
new LookupExtractorFactoryContainer(
|
||||
"v0",
|
||||
new LookupExtractorFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean start()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean close()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replaces(@Nullable final LookupExtractorFactory other)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupIntrospectHandler getIntrospectHandler()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
return new MapLookupExtractor(theLookup, false);
|
||||
}
|
||||
}
|
||||
)
|
||||
).anyTimes();
|
||||
EasyMock.expect(lookupReferencesManager.get(EasyMock.not(EasyMock.eq("lookyloo")))).andReturn(null).anyTimes();
|
||||
EasyMock.replay(lookupReferencesManager);
|
||||
return lookupReferencesManager;
|
||||
}
|
||||
}
|
|
@ -22,9 +22,15 @@ package io.druid.query.lookup;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -32,11 +38,35 @@ import org.junit.Test;
|
|||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.reset;
|
||||
|
||||
public class LookupReferencesManagerTest
|
||||
{
|
||||
LookupReferencesManager lookupReferencesManager;
|
||||
|
||||
private DruidLeaderClient druidLeaderClient;
|
||||
|
||||
private LookupListeningAnnouncerConfig config;
|
||||
|
||||
private static final String propertyBase = "some.property";
|
||||
|
||||
private static final String LOOKUP_TIER = "lookupTier";
|
||||
|
||||
private static final int LOOKUP_THREADS = 1;
|
||||
|
||||
private static final boolean LOOKUP_DISABLE = false;
|
||||
|
||||
LookupExtractorFactory lookupExtractorFactory;
|
||||
|
||||
LookupExtractorFactoryContainer container;
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
@ -46,22 +76,49 @@ public class LookupReferencesManagerTest
|
|||
{
|
||||
EmittingLogger.registerEmitter(new NoopServiceEmitter());
|
||||
|
||||
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
|
||||
|
||||
config = createMock(LookupListeningAnnouncerConfig.class);
|
||||
|
||||
lookupExtractorFactory = new MapLookupExtractorFactory(
|
||||
ImmutableMap.<String, String>of(
|
||||
"key",
|
||||
"value"
|
||||
), true
|
||||
);
|
||||
container = new LookupExtractorFactoryContainer("v0", lookupExtractorFactory);
|
||||
mapper.registerSubtypes(MapLookupExtractorFactory.class);
|
||||
String temporaryPath = temporaryFolder.newFolder().getAbsolutePath();
|
||||
lookupReferencesManager = new LookupReferencesManager(
|
||||
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
|
||||
mapper,
|
||||
mapper, druidLeaderClient, config,
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartStop()
|
||||
public void testStartStop() throws InterruptedException, IOException
|
||||
{
|
||||
lookupReferencesManager = new LookupReferencesManager(
|
||||
new LookupConfig(null),
|
||||
mapper
|
||||
mapper, druidLeaderClient, config
|
||||
);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForStartStop", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
Assert.assertFalse(lookupReferencesManager.lifecycleLock.awaitStarted(1, TimeUnit.MICROSECONDS));
|
||||
Assert.assertNull(lookupReferencesManager.mainThread);
|
||||
Assert.assertNull(lookupReferencesManager.stateRef.get());
|
||||
|
@ -107,6 +164,22 @@ public class LookupReferencesManagerTest
|
|||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForAddGetRemove", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertNull(lookupReferencesManager.get("test"));
|
||||
|
||||
|
@ -130,6 +203,21 @@ public class LookupReferencesManagerTest
|
|||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once();
|
||||
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForCloseIsCalledAfterStopping", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testMock", new LookupExtractorFactoryContainer("0", lookupExtractorFactory));
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
@ -146,6 +234,21 @@ public class LookupReferencesManagerTest
|
|||
EasyMock.expect(lookupExtractorFactory.close()).andReturn(true).once();
|
||||
EasyMock.replay(lookupExtractorFactory);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForCloseIsCalledAfterRemove", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testMock", new LookupExtractorFactoryContainer("0", lookupExtractorFactory));
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
@ -157,8 +260,23 @@ public class LookupReferencesManagerTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetNotThere()
|
||||
public void testGetNotThere() throws Exception
|
||||
{
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForGetNotThere", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertNull(lookupReferencesManager.get("notThere"));
|
||||
}
|
||||
|
@ -174,7 +292,21 @@ public class LookupReferencesManagerTest
|
|||
EasyMock.expect(lookupExtractorFactory2.start()).andReturn(true).once();
|
||||
|
||||
EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForUpdateWithHigherVersion", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testName", new LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
@ -194,7 +326,21 @@ public class LookupReferencesManagerTest
|
|||
LookupExtractorFactory lookupExtractorFactory2 = EasyMock.createNiceMock(LookupExtractorFactory.class);
|
||||
|
||||
EasyMock.replay(lookupExtractorFactory1, lookupExtractorFactory2);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForUpdateWithLowerVersion", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testName", new LookupExtractorFactoryContainer("1", lookupExtractorFactory1));
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
|
@ -208,35 +354,26 @@ public class LookupReferencesManagerTest
|
|||
@Test
|
||||
public void testRemoveNonExisting() throws Exception
|
||||
{
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForRemoveNonExisting", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.remove("test");
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBootstrapFromFile() throws Exception
|
||||
{
|
||||
LookupExtractorFactory lookupExtractorFactory = new MapLookupExtractorFactory(
|
||||
ImmutableMap.<String, String>of(
|
||||
"key",
|
||||
"value"
|
||||
), true
|
||||
);
|
||||
LookupExtractorFactoryContainer container = new LookupExtractorFactoryContainer("v0", lookupExtractorFactory);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testMockForBootstrap", container);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
lookupReferencesManager.stop();
|
||||
|
||||
lookupReferencesManager = new LookupReferencesManager(
|
||||
new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile().getParent()),
|
||||
mapper,
|
||||
true
|
||||
);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertEquals(container, lookupReferencesManager.get("testMockForBootstrap"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllLookupsState() throws Exception
|
||||
{
|
||||
|
@ -269,7 +406,20 @@ public class LookupReferencesManagerTest
|
|||
), true
|
||||
)
|
||||
);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("one", container1);
|
||||
lookupReferencesManager.add("two", container2);
|
||||
|
@ -290,14 +440,28 @@ public class LookupReferencesManagerTest
|
|||
Assert.assertTrue(state.getToDrop().contains("one"));
|
||||
}
|
||||
|
||||
@Test (timeout = 20000)
|
||||
@Test(timeout = 20000)
|
||||
public void testRealModeWithMainThread() throws Exception
|
||||
{
|
||||
LookupReferencesManager lookupReferencesManager = new LookupReferencesManager(
|
||||
new LookupConfig(temporaryFolder.newFolder().getAbsolutePath()),
|
||||
mapper
|
||||
mapper, druidLeaderClient, config
|
||||
);
|
||||
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForRealModeWithMainThread", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertTrue(lookupReferencesManager.mainThread.isAlive());
|
||||
|
||||
|
@ -324,4 +488,128 @@ public class LookupReferencesManagerTest
|
|||
|
||||
Assert.assertFalse(lookupReferencesManager.mainThread.isAlive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoordinatorLookupSync() throws Exception
|
||||
{
|
||||
LookupExtractorFactoryContainer container1 = new LookupExtractorFactoryContainer(
|
||||
"0",
|
||||
new MapLookupExtractorFactory(
|
||||
ImmutableMap.of(
|
||||
"key1",
|
||||
"value1"
|
||||
), true
|
||||
)
|
||||
);
|
||||
|
||||
LookupExtractorFactoryContainer container2 = new LookupExtractorFactoryContainer(
|
||||
"0",
|
||||
new MapLookupExtractorFactory(
|
||||
ImmutableMap.of(
|
||||
"key2",
|
||||
"value2"
|
||||
), true
|
||||
)
|
||||
);
|
||||
|
||||
LookupExtractorFactoryContainer container3 = new LookupExtractorFactoryContainer(
|
||||
"0",
|
||||
new MapLookupExtractorFactory(
|
||||
ImmutableMap.of(
|
||||
"key3",
|
||||
"value3"
|
||||
), true
|
||||
)
|
||||
);
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testLookup1", container1);
|
||||
lookupMap.put("testLookup2", container2);
|
||||
lookupMap.put("testLookup3", container3);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertEquals(container1, lookupReferencesManager.get("testLookup1"));
|
||||
Assert.assertEquals(container2, lookupReferencesManager.get("testLookup2"));
|
||||
Assert.assertEquals(container3, lookupReferencesManager.get("testLookup3"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadLookupOnCoordinatorFailure() throws Exception
|
||||
{
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForLoadLookupOnCoordinatorFailure", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.NOT_FOUND,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException());
|
||||
replay(druidLeaderClient);
|
||||
|
||||
lookupReferencesManager.start();
|
||||
lookupReferencesManager.add("testMockForLoadLookupOnCoordinatorFailure", container);
|
||||
lookupReferencesManager.handlePendingNotices();
|
||||
lookupReferencesManager.stop();
|
||||
lookupReferencesManager = new LookupReferencesManager(
|
||||
new LookupConfig(lookupReferencesManager.lookupSnapshotTaker.getPersistFile().getParent()),
|
||||
mapper, druidLeaderClient, config,
|
||||
true
|
||||
);
|
||||
reset(config);
|
||||
reset(druidLeaderClient);
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException());
|
||||
replay(druidLeaderClient);
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertEquals(container, lookupReferencesManager.get("testMockForLoadLookupOnCoordinatorFailure"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableLookupSync() throws Exception
|
||||
{
|
||||
LookupReferencesManager lookupReferencesManager = new LookupReferencesManager(
|
||||
new LookupConfig(null),
|
||||
mapper, druidLeaderClient, config
|
||||
);
|
||||
Map<String, Object> lookupMap = new HashMap<>();
|
||||
lookupMap.put("testMockForDisableLookupSync", container);
|
||||
String strResult = mapper.writeValueAsString(lookupMap);
|
||||
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
|
||||
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
|
||||
replay(config);
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
|
||||
.andReturn(request);
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
|
||||
lookupReferencesManager.start();
|
||||
Assert.assertNull(lookupReferencesManager.get("testMockForDisableLookupSync"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -86,9 +86,10 @@ public class LookupCoordinatorResourceTest
|
|||
ImmutableMap.of(LOOKUP_NAME, SINGLE_LOOKUP), null, null
|
||||
);
|
||||
|
||||
private static final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> NODES_LOOKUP_STATE = ImmutableMap.of(
|
||||
LOOKUP_NODE, LOOKUP_STATE
|
||||
);
|
||||
private static final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> NODES_LOOKUP_STATE = ImmutableMap
|
||||
.of(
|
||||
LOOKUP_NODE, LOOKUP_STATE
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testSimpleGet()
|
||||
|
@ -208,6 +209,23 @@ public class LookupCoordinatorResourceTest
|
|||
EasyMock.verify(lookupCoordinatorManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDetailedGetLookup()
|
||||
{
|
||||
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class);
|
||||
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(SINGLE_TIER_MAP).once();
|
||||
EasyMock.replay(lookupCoordinatorManager);
|
||||
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||
lookupCoordinatorManager,
|
||||
mapper,
|
||||
mapper
|
||||
);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(LOOKUP_TIER, true);
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
Assert.assertEquals(SINGLE_TIER_MAP.get(LOOKUP_TIER), response.getEntity());
|
||||
EasyMock.verify(lookupCoordinatorManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingGetLookup()
|
||||
{
|
||||
|
@ -765,7 +783,7 @@ public class LookupCoordinatorResourceTest
|
|||
mapper,
|
||||
mapper
|
||||
);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(LOOKUP_TIER);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(LOOKUP_TIER, false);
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
Assert.assertEquals(SINGLE_TIER_MAP.get(LOOKUP_TIER).keySet(), response.getEntity());
|
||||
EasyMock.verify(lookupCoordinatorManager);
|
||||
|
@ -785,7 +803,7 @@ public class LookupCoordinatorResourceTest
|
|||
mapper,
|
||||
mapper
|
||||
);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier, false);
|
||||
Assert.assertEquals(404, response.getStatus());
|
||||
EasyMock.verify(lookupCoordinatorManager);
|
||||
}
|
||||
|
@ -801,7 +819,7 @@ public class LookupCoordinatorResourceTest
|
|||
mapper,
|
||||
mapper
|
||||
);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier, false);
|
||||
Assert.assertEquals(400, response.getStatus());
|
||||
Assert.assertEquals(ImmutableMap.of("error", "`tier` required"), response.getEntity());
|
||||
EasyMock.verify(lookupCoordinatorManager);
|
||||
|
@ -819,7 +837,7 @@ public class LookupCoordinatorResourceTest
|
|||
mapper,
|
||||
mapper
|
||||
);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier, false);
|
||||
Assert.assertEquals(404, response.getStatus());
|
||||
Assert.assertEquals(ImmutableMap.of("error", "No lookups found"), response.getEntity());
|
||||
EasyMock.verify(lookupCoordinatorManager);
|
||||
|
@ -838,7 +856,7 @@ public class LookupCoordinatorResourceTest
|
|||
mapper,
|
||||
mapper
|
||||
);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
|
||||
final Response response = lookupCoordinatorResource.getSpecificTier(tier, false);
|
||||
Assert.assertEquals(500, response.getStatus());
|
||||
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
|
||||
EasyMock.verify(lookupCoordinatorManager);
|
||||
|
|
|
@ -83,6 +83,13 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -58,7 +58,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
|||
import io.druid.query.aggregation.FloatSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.expression.LookupExprMacro;
|
||||
import io.druid.query.expression.TestExprMacroTable;
|
||||
import io.druid.query.expression.TestExpressionMacroTable;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryRunnerTest;
|
||||
|
@ -138,13 +138,16 @@ public class CalciteTests
|
|||
|
||||
// This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup.
|
||||
|
||||
LookupReferencesManager testLookupReferencesManager = TestExprMacroTable.createTestLookupReferencesManager(
|
||||
ImmutableMap.of(
|
||||
"a", "xa",
|
||||
"abc", "xabc"
|
||||
)
|
||||
);
|
||||
binder.bind(LookupReferencesManager.class).toInstance(testLookupReferencesManager);
|
||||
binder.bind(LookupReferencesManager.class)
|
||||
.toInstance(
|
||||
TestExpressionMacroTable.createTestLookupReferencesManager(
|
||||
ImmutableMap.of(
|
||||
"a", "xa",
|
||||
"abc", "xabc"
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue