From 7ec7257e1dbc095019e20958a48f3aea89632609 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 24 May 2019 12:30:49 -0700 Subject: [PATCH] Fix lookup serde on node types that don't load lookups. (#7752) This includes the router, overlord, middleManager, and coordinator. Does the following things: - Loads LookupSerdeModule on MM, overlord, and coordinator. - Adds LookupExprMacro to LookupSerdeModule, which allows these node types to understand that the 'lookup' function exists. - Adds a test to make sure that LookupSerdeModule works for virtual columns, filters, transforms, and dimension specs. This is implementing the technique discussed on these two issues: - https://github.com/apache/incubator-druid/issues/7724#issuecomment-494723333 - https://github.com/apache/incubator-druid/pull/7082#discussion_r264888771 --- .../druid/query/lookup/LookupSerdeModule.java | 7 +- .../query/lookup/LookupSerdeModuleTest.java | 130 ++++++++++++++++++ .../org/apache/druid/cli/CliCoordinator.java | 3 + .../apache/druid/cli/CliMiddleManager.java | 5 +- .../org/apache/druid/cli/CliOverlord.java | 2 + 5 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/query/lookup/LookupSerdeModuleTest.java diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java b/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java index 3a71377478b..d08205cd412 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupSerdeModule.java @@ -24,17 +24,19 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.initialization.DruidModule; import org.apache.druid.query.dimension.LookupDimensionSpec; +import org.apache.druid.query.expression.LookupExprMacro; import javax.annotation.Nullable; import java.util.List; /** * Variant of {@link LookupModule} that only supports serde of {@link org.apache.druid.query.Query} objects, to allow - * a service to examine queries that might contain for example a {@link RegisteredLookupExtractionFn}, but without - * requiring the service to load the actual lookups. + * a service to examine queries that might contain for example a {@link RegisteredLookupExtractionFn} or a + * {@link LookupExprMacro}, but without requiring the service to load the actual lookups. */ public class LookupSerdeModule implements DruidModule { @@ -55,6 +57,7 @@ public class LookupSerdeModule implements DruidModule { JsonConfigProvider.bind(binder, LookupModule.PROPERTY_BASE, LookupConfig.class); binder.bind(LookupExtractorFactoryContainerProvider.class).to(NoopLookupExtractorFactoryContainerProvider.class); + ExpressionModule.addExprMacro(binder, LookupExprMacro.class); } /** diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupSerdeModuleTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupSerdeModuleTest.java new file mode 100644 index 00000000000..4329fe1115c --- /dev/null +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupSerdeModuleTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import com.google.inject.Key; +import org.apache.druid.guice.ExpressionModule; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.transform.ExpressionTransform; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class LookupSerdeModuleTest +{ + private Injector injector; + private ObjectMapper objectMapper; + + @Before + public void setUp() + { + final ImmutableList modules = ImmutableList.of( + new ExpressionModule(), + new LookupSerdeModule() + ); + + injector = GuiceInjectors.makeStartupInjectorWithModules(modules); + objectMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); + objectMapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class, injector.getInstance(ExprMacroTable.class)) + .addValue( + LookupExtractorFactoryContainerProvider.class, + injector.getInstance(LookupExtractorFactoryContainerProvider.class) + ) + ); + modules.stream().flatMap(module -> module.getJacksonModules().stream()).forEach(objectMapper::registerModule); + } + + @Test + public void testExpressionVirtualColumnSerde() throws Exception + { + final ExpressionVirtualColumn virtualColumn = new ExpressionVirtualColumn( + "v", + "lookup(xxx, 'beep')", + ValueType.STRING, + injector.getInstance(ExprMacroTable.class) + ); + + Assert.assertEquals( + virtualColumn, + objectMapper.readValue(objectMapper.writeValueAsBytes(virtualColumn), VirtualColumn.class) + ); + } + + @Test + public void testExtractionDimensionSerde() throws Exception + { + final ExtractionDimensionSpec dimensionSpec = new ExtractionDimensionSpec( + "xxx", + "d", + new RegisteredLookupExtractionFn(null, "beep", false, null, null, null) + ); + + Assert.assertEquals( + dimensionSpec, + objectMapper.readValue(objectMapper.writeValueAsBytes(dimensionSpec), DimensionSpec.class) + ); + } + + @Test + public void testExtractionFilterSere() throws Exception + { + final SelectorDimFilter filter = new SelectorDimFilter( + "xxx", + "d", + new RegisteredLookupExtractionFn(null, "beep", false, null, null, null) + ); + + Assert.assertEquals( + filter, + objectMapper.readValue(objectMapper.writeValueAsBytes(filter), DimFilter.class) + ); + } + + @Test + public void testExpressionTransformSerde() throws Exception + { + final ExpressionTransform transform = new ExpressionTransform( + "xxx", + "lookup(xxx, 'beep')", + injector.getInstance(ExprMacroTable.class) + ); + + Assert.assertEquals( + transform, + objectMapper.readValue(objectMapper.writeValueAsBytes(transform), ExpressionTransform.class) + ); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 88245624093..c1ab5b94a7d 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -58,6 +58,7 @@ import org.apache.druid.metadata.MetadataSegmentManagerConfig; import org.apache.druid.metadata.MetadataSegmentManagerProvider; import org.apache.druid.metadata.MetadataStorage; import org.apache.druid.metadata.MetadataStorageProvider; +import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.server.audit.AuditManagerProvider; import org.apache.druid.server.coordinator.BalancerStrategyFactory; import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig; @@ -271,6 +272,8 @@ public class CliCoordinator extends ServerRunnable } ); + modules.add(new LookupSerdeModule()); + if (beOverlord) { modules.addAll(new CliOverlord().getModules(false)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 25c4e13e32d..ecdadfa5f70 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -56,6 +56,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -64,6 +65,7 @@ import org.eclipse.jetty.server.Server; import java.util.List; /** + * */ @Command( name = "middleManager", @@ -163,7 +165,8 @@ public class CliMiddleManager extends ServerRunnable } }, new IndexingServiceFirehoseModule(), - new IndexingServiceTaskLogsModule() + new IndexingServiceTaskLogsModule(), + new LookupSerdeModule() ); } } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index a0f565f0d16..c11031f08f9 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -93,6 +93,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.lookup.LookupSerdeModule; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.audit.AuditManagerProvider; import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; @@ -333,6 +334,7 @@ public class CliOverlord extends ServerRunnable }, new IndexingServiceFirehoseModule(), new IndexingServiceTaskLogsModule(), + new LookupSerdeModule(), new SamplerModule() ); }