mirror of https://github.com/apache/druid.git
Fix lookup serde on node types that don't load lookups. (#7752)
This includes the router, overlord, middleManager, and coordinator. Does the following things: - Loads LookupSerdeModule on MM, overlord, and coordinator. - Adds LookupExprMacro to LookupSerdeModule, which allows these node types to understand that the 'lookup' function exists. - Adds a test to make sure that LookupSerdeModule works for virtual columns, filters, transforms, and dimension specs. This is implementing the technique discussed on these two issues: - https://github.com/apache/incubator-druid/issues/7724#issuecomment-494723333 - https://github.com/apache/incubator-druid/pull/7082#discussion_r264888771
This commit is contained in:
parent
db3792727e
commit
7ec7257e1d
|
@ -24,17 +24,19 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
|
import org.apache.druid.guice.ExpressionModule;
|
||||||
import org.apache.druid.guice.JsonConfigProvider;
|
import org.apache.druid.guice.JsonConfigProvider;
|
||||||
import org.apache.druid.initialization.DruidModule;
|
import org.apache.druid.initialization.DruidModule;
|
||||||
import org.apache.druid.query.dimension.LookupDimensionSpec;
|
import org.apache.druid.query.dimension.LookupDimensionSpec;
|
||||||
|
import org.apache.druid.query.expression.LookupExprMacro;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Variant of {@link LookupModule} that only supports serde of {@link org.apache.druid.query.Query} objects, to allow
|
* Variant of {@link LookupModule} that only supports serde of {@link org.apache.druid.query.Query} objects, to allow
|
||||||
* a service to examine queries that might contain for example a {@link RegisteredLookupExtractionFn}, but without
|
* a service to examine queries that might contain for example a {@link RegisteredLookupExtractionFn} or a
|
||||||
* requiring the service to load the actual lookups.
|
* {@link LookupExprMacro}, but without requiring the service to load the actual lookups.
|
||||||
*/
|
*/
|
||||||
public class LookupSerdeModule implements DruidModule
|
public class LookupSerdeModule implements DruidModule
|
||||||
{
|
{
|
||||||
|
@ -55,6 +57,7 @@ public class LookupSerdeModule implements DruidModule
|
||||||
{
|
{
|
||||||
JsonConfigProvider.bind(binder, LookupModule.PROPERTY_BASE, LookupConfig.class);
|
JsonConfigProvider.bind(binder, LookupModule.PROPERTY_BASE, LookupConfig.class);
|
||||||
binder.bind(LookupExtractorFactoryContainerProvider.class).to(NoopLookupExtractorFactoryContainerProvider.class);
|
binder.bind(LookupExtractorFactoryContainerProvider.class).to(NoopLookupExtractorFactoryContainerProvider.class);
|
||||||
|
ExpressionModule.addExprMacro(binder, LookupExprMacro.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.query.lookup;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import org.apache.druid.guice.ExpressionModule;
|
||||||
|
import org.apache.druid.guice.GuiceInjectors;
|
||||||
|
import org.apache.druid.guice.annotations.Json;
|
||||||
|
import org.apache.druid.initialization.DruidModule;
|
||||||
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
|
import org.apache.druid.query.dimension.DimensionSpec;
|
||||||
|
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
|
||||||
|
import org.apache.druid.query.filter.DimFilter;
|
||||||
|
import org.apache.druid.query.filter.SelectorDimFilter;
|
||||||
|
import org.apache.druid.segment.VirtualColumn;
|
||||||
|
import org.apache.druid.segment.column.ValueType;
|
||||||
|
import org.apache.druid.segment.transform.ExpressionTransform;
|
||||||
|
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class LookupSerdeModuleTest
|
||||||
|
{
|
||||||
|
private Injector injector;
|
||||||
|
private ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
final ImmutableList<DruidModule> modules = ImmutableList.of(
|
||||||
|
new ExpressionModule(),
|
||||||
|
new LookupSerdeModule()
|
||||||
|
);
|
||||||
|
|
||||||
|
injector = GuiceInjectors.makeStartupInjectorWithModules(modules);
|
||||||
|
objectMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
|
||||||
|
objectMapper.setInjectableValues(
|
||||||
|
new InjectableValues.Std()
|
||||||
|
.addValue(ExprMacroTable.class, injector.getInstance(ExprMacroTable.class))
|
||||||
|
.addValue(
|
||||||
|
LookupExtractorFactoryContainerProvider.class,
|
||||||
|
injector.getInstance(LookupExtractorFactoryContainerProvider.class)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
modules.stream().flatMap(module -> module.getJacksonModules().stream()).forEach(objectMapper::registerModule);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionVirtualColumnSerde() throws Exception
|
||||||
|
{
|
||||||
|
final ExpressionVirtualColumn virtualColumn = new ExpressionVirtualColumn(
|
||||||
|
"v",
|
||||||
|
"lookup(xxx, 'beep')",
|
||||||
|
ValueType.STRING,
|
||||||
|
injector.getInstance(ExprMacroTable.class)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
virtualColumn,
|
||||||
|
objectMapper.readValue(objectMapper.writeValueAsBytes(virtualColumn), VirtualColumn.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractionDimensionSerde() throws Exception
|
||||||
|
{
|
||||||
|
final ExtractionDimensionSpec dimensionSpec = new ExtractionDimensionSpec(
|
||||||
|
"xxx",
|
||||||
|
"d",
|
||||||
|
new RegisteredLookupExtractionFn(null, "beep", false, null, null, null)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
dimensionSpec,
|
||||||
|
objectMapper.readValue(objectMapper.writeValueAsBytes(dimensionSpec), DimensionSpec.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractionFilterSere() throws Exception
|
||||||
|
{
|
||||||
|
final SelectorDimFilter filter = new SelectorDimFilter(
|
||||||
|
"xxx",
|
||||||
|
"d",
|
||||||
|
new RegisteredLookupExtractionFn(null, "beep", false, null, null, null)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
filter,
|
||||||
|
objectMapper.readValue(objectMapper.writeValueAsBytes(filter), DimFilter.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionTransformSerde() throws Exception
|
||||||
|
{
|
||||||
|
final ExpressionTransform transform = new ExpressionTransform(
|
||||||
|
"xxx",
|
||||||
|
"lookup(xxx, 'beep')",
|
||||||
|
injector.getInstance(ExprMacroTable.class)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
transform,
|
||||||
|
objectMapper.readValue(objectMapper.writeValueAsBytes(transform), ExpressionTransform.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ import org.apache.druid.metadata.MetadataSegmentManagerConfig;
|
||||||
import org.apache.druid.metadata.MetadataSegmentManagerProvider;
|
import org.apache.druid.metadata.MetadataSegmentManagerProvider;
|
||||||
import org.apache.druid.metadata.MetadataStorage;
|
import org.apache.druid.metadata.MetadataStorage;
|
||||||
import org.apache.druid.metadata.MetadataStorageProvider;
|
import org.apache.druid.metadata.MetadataStorageProvider;
|
||||||
|
import org.apache.druid.query.lookup.LookupSerdeModule;
|
||||||
import org.apache.druid.server.audit.AuditManagerProvider;
|
import org.apache.druid.server.audit.AuditManagerProvider;
|
||||||
import org.apache.druid.server.coordinator.BalancerStrategyFactory;
|
import org.apache.druid.server.coordinator.BalancerStrategyFactory;
|
||||||
import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
|
import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
|
||||||
|
@ -271,6 +272,8 @@ public class CliCoordinator extends ServerRunnable
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
modules.add(new LookupSerdeModule());
|
||||||
|
|
||||||
if (beOverlord) {
|
if (beOverlord) {
|
||||||
modules.addAll(new CliOverlord().getModules(false));
|
modules.addAll(new CliOverlord().getModules(false));
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
|
||||||
import org.apache.druid.indexing.worker.http.TaskManagementResource;
|
import org.apache.druid.indexing.worker.http.TaskManagementResource;
|
||||||
import org.apache.druid.indexing.worker.http.WorkerResource;
|
import org.apache.druid.indexing.worker.http.WorkerResource;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.query.lookup.LookupSerdeModule;
|
||||||
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
|
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
|
||||||
|
@ -64,6 +65,7 @@ import org.eclipse.jetty.server.Server;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
@Command(
|
@Command(
|
||||||
name = "middleManager",
|
name = "middleManager",
|
||||||
|
@ -163,7 +165,8 @@ public class CliMiddleManager extends ServerRunnable
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new IndexingServiceFirehoseModule(),
|
new IndexingServiceFirehoseModule(),
|
||||||
new IndexingServiceTaskLogsModule()
|
new IndexingServiceTaskLogsModule(),
|
||||||
|
new LookupSerdeModule()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
|
||||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
|
import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
|
||||||
import org.apache.druid.indexing.worker.config.WorkerConfig;
|
import org.apache.druid.indexing.worker.config.WorkerConfig;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.query.lookup.LookupSerdeModule;
|
||||||
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||||
import org.apache.druid.server.audit.AuditManagerProvider;
|
import org.apache.druid.server.audit.AuditManagerProvider;
|
||||||
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
|
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
|
||||||
|
@ -333,6 +334,7 @@ public class CliOverlord extends ServerRunnable
|
||||||
},
|
},
|
||||||
new IndexingServiceFirehoseModule(),
|
new IndexingServiceFirehoseModule(),
|
||||||
new IndexingServiceTaskLogsModule(),
|
new IndexingServiceTaskLogsModule(),
|
||||||
|
new LookupSerdeModule(),
|
||||||
new SamplerModule()
|
new SamplerModule()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue