diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java new file mode 100644 index 00000000000..7ef6ca501e5 --- /dev/null +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.initialization.Initialization; +import org.apache.druid.server.DruidNode; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +public class LookupUtilsTest +{ + private static final TypeReference> LOOKUPS_ALL_GENERIC_REFERENCE = + new TypeReference>() + { + }; + + private static final TypeReference> LOOKUPS_ALL_REFERENCE = + new TypeReference>() + { + }; + + private static final String LOOKUP_VALID_INNER = " \"lookup_uri_good\": {\n" + + " \"version\": \"2021-12-03T01:04:05.151Z\",\n" + + " \"lookupExtractorFactory\": {\n" + + " \"type\": \"cachedNamespace\",\n" + + " \"extractionNamespace\": {\n" + + " \"type\": \"uri\",\n" + + " \"uri\": \"file:///home/lookup_data.json\",\n" + + " \"namespaceParseSpec\": {\n" + + " \"format\": \"simpleJson\"\n" + + " },\n" + + " \"pollPeriod\": \"PT30S\",\n" + + " \"maxHeapPercentage\": 1\n" + + " }\n" + + " }\n" + + " }"; + + + private static final String LOOKUP_VALID = "{\n" + + LOOKUP_VALID_INNER + "\n" + + "}"; + + private static final String LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER = + " \"lookup_keyColumn_but_no_valueColumn\": {\n" + + " \"version\": \"2021-12-03T02:17:01.983Z\",\n" + + " \"lookupExtractorFactory\": {\n" + + " \"type\": \"cachedNamespace\",\n" + + " \"extractionNamespace\": {\n" + + " \"type\": \"uri\",\n" + + " \"fileRegex\": \".*csv\",\n" + + " \"uriPrefix\": \"s3://bucket/path/\",\n" + + " \"namespaceParseSpec\": {\n" + + " \"format\": \"csv\",\n" + + " \"columns\": [\n" + + " \"cluster_id\",\n" + + " \"account_id\",\n" + + " \"manager_host\"\n" + + " ],\n" + + " \"keyColumn\": \"cluster_id\",\n" + + " \"hasHeaderRow\": true,\n" + + " \"skipHeaderRows\": 1\n" + + " },\n" + + " \"pollPeriod\": \"PT30S\"\n" + + " }\n" + + " }\n" + + " }"; + + private static final String LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN = + "{\n" + + LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER + "\n" + + "}"; + + private static final String LOOKSUPS_INVALID_AND_VALID = "{\n" + + LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER + ",\n" + + LOOKUP_VALID_INNER + "\n" + + "}"; + private ObjectMapper mapper; + + @Before + public void setup() + { + final Injector injector = makeInjector(); + mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + } + + @Test + public void test_tryConvertObjectMapToLookupConfigMap_allValid() throws IOException + { + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + Map validLookupExpected = mapper.readValue( + LOOKUP_VALID, + LOOKUPS_ALL_REFERENCE); + + Map validLookupGeneric = mapper.readValue( + LOOKUP_VALID, + LOOKUPS_ALL_GENERIC_REFERENCE); + Map actualLookup = + LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper); + + Assert.assertEquals(mapper.writeValueAsString(validLookupExpected), mapper.writeValueAsString(actualLookup)); + } + + @Test + public void test_tryConvertObjectMapToLookupConfigMap_allInvalid_emptyMap() + throws IOException + { + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + + Map validLookupGeneric = mapper.readValue( + LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN, + LOOKUPS_ALL_GENERIC_REFERENCE); + Map actualLookup = + LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper); + + Assert.assertTrue(actualLookup.isEmpty()); + } + + @Test + public void test_tryConvertObjectMapToLookupConfigMap_goodAndBadConfigs_skipsBad() + throws IOException + { + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + Map validLookupExpected = mapper.readValue( + LOOKUP_VALID, + LOOKUPS_ALL_REFERENCE); + + Map validLookupGeneric = mapper.readValue( + LOOKSUPS_INVALID_AND_VALID, + LOOKUPS_ALL_GENERIC_REFERENCE); + Map actualLookup = + LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper); + + Assert.assertEquals(mapper.writeValueAsString(validLookupExpected), mapper.writeValueAsString(actualLookup)); + } + + private Injector makeInjector() + { + return Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + new DruidNode("test-inject", null, false, null, null, true, false) + ); + } + } + ) + ); + } +} diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java index 3256e3e9d5e..01f7d684831 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java @@ -46,8 +46,8 @@ class LookupListeningResource extends ListenerResource { private static final Logger LOG = new Logger(LookupListeningResource.class); - private static final TypeReference> LOOKUPS_STATE_TYPE_REFERENCE = - new TypeReference>() + private static final TypeReference> LOOKUPS_STATE_GENERIC_REFERENCE = + new TypeReference>() { }; @@ -68,9 +68,22 @@ class LookupListeningResource extends ListenerResource @Override public Response handleUpdates(InputStream inputStream, ObjectMapper mapper) { + final LookupsState stateGeneric; final LookupsState state; + final Map current; + final Map toLoad; try { - state = mapper.readValue(inputStream, LOOKUPS_STATE_TYPE_REFERENCE); + stateGeneric = mapper.readValue(inputStream, LOOKUPS_STATE_GENERIC_REFERENCE); + current = LookupUtils.tryConvertObjectMapToLookupConfigMap( + stateGeneric.getCurrent(), + mapper + ); + toLoad = LookupUtils.tryConvertObjectMapToLookupConfigMap( + stateGeneric.getToLoad(), + mapper + ); + + state = new LookupsState<>(current, toLoad, stateGeneric.getToDrop()); } catch (final IOException ex) { LOG.debug(ex, "Bad request"); diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java index 8c5872e3ba7..0682a06f352 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java @@ -84,8 +84,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP { private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class); - private static final TypeReference> LOOKUPS_ALL_REFERENCE = - new TypeReference>() + private static final TypeReference> LOOKUPS_ALL_GENERIC_REFERENCE = + new TypeReference>() { }; @@ -429,7 +429,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP } @Nullable - private Map tryGetLookupListFromCoordinator(String tier) throws Exception + private Map tryGetLookupListFromCoordinator(String tier) + throws IOException, InterruptedException { final StringFullResponseHolder response = fetchLookupsForTier(tier); if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { @@ -454,7 +455,12 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP ); return null; } else { - return jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_REFERENCE); + Map lookupNameToGenericConfig = + jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_GENERIC_REFERENCE); + return LookupUtils.tryConvertObjectMapToLookupConfigMap( + lookupNameToGenericConfig, + jsonMapper + ); } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java b/server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java new file mode 100644 index 00000000000..919d9e41546 --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Map; + +/** + * Utility class for lookup related things + */ +public class LookupUtils +{ + + private static final EmittingLogger LOG = new EmittingLogger(LookupUtils.class); + + private LookupUtils() + { + + } + + /** + * Takes a map of String to Object, representing lookup name to generic lookup config, and attempts to construct + * a map from String to {@link LookupExtractorFactoryContainer}. Any lookup configs that are not able to be converted + * to {@link LookupExtractorFactoryContainer}, will be logged as warning, and will not be included in the map + * returned. + * + * @param lookupNameToGenericConfig The lookup generic config map. + * @param objectMapper The object mapper to use to convert bytes to {@link LookupExtractorFactoryContainer} + * @return + */ + public static Map tryConvertObjectMapToLookupConfigMap( + Map lookupNameToGenericConfig, + ObjectMapper objectMapper + ) + { + Map lookupNameToConfig = + Maps.newHashMapWithExpectedSize(lookupNameToGenericConfig.size()); + for (Map.Entry lookupNameAndConfig : lookupNameToGenericConfig.entrySet()) { + String lookupName = lookupNameAndConfig.getKey(); + LookupExtractorFactoryContainer lookupConfig = tryConvertObjectToLookupConfig( + lookupName, + lookupNameAndConfig.getValue(), + objectMapper + ); + if (lookupConfig != null) { + lookupNameToConfig.put(lookupName, lookupConfig); + } + + } + return lookupNameToConfig; + } + + @Nullable + private static LookupExtractorFactoryContainer tryConvertObjectToLookupConfig( + String lookupName, + Object o, + ObjectMapper objectMapper) + { + try { + byte[] lookupConfigBytes = objectMapper.writeValueAsBytes(o); + return objectMapper.readValue( + lookupConfigBytes, + LookupExtractorFactoryContainer.class + ); + } + catch (IOException e) { + LOG.warn("Lookup [%s] could not be serialized properly. Please check its configuration. Error: %s", + lookupName, + e.getMessage() + ); + } + return null; + } +} diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java index 04a5fd781b1..cabbb0b3d1a 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java @@ -28,9 +28,9 @@ import java.util.Map; import java.util.Objects; /** - * This is same as LookupExtractorFactoryContainer except it uses Map instead of - * LookupExtractorFactory for referencing lookup spec so that lookup extensions are not required to - * be loaded at the Coordinator. + * This is same as {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainer } except it uses + * Map instead of LookupExtractorFactory for referencing lookup spec so that lookup extensions are not + * required to be loaded at the Coordinator. */ public class LookupExtractorFactoryMapContainer {