From 65cadbe42a23fc4dca0d1f703b755293f54f15c4 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 7 Dec 2021 00:55:34 -0500 Subject: [PATCH] Fix bad lookup config fails task (#12021) This PR fixes an issue in which if a lookup is configured incorreclty; does not serialize properly when being pulled by peon node, it causes the task to fail. The failure occurs because the peon and other leaf nodes (broker, historical), have retry logic that continues to retry the lookup loading for 3 minutes by default. The http listener thread on the peon task is not started until lookup loading completes, by default, the overlord waits 1 minute by default, to communicate with the peon task to get the task status, after which is orders the task to shut down, causing the ingestion task to fail. To fix the issue, we catch the exception serialization error, and do not retry. Also fixed an issue in which a bad lookup config interferes with any other good lookup configs from being loaded. --- .../druid/query/lookup/LookupUtilsTest.java | 189 ++++++++++++++++++ .../query/lookup/LookupListeningResource.java | 19 +- .../query/lookup/LookupReferencesManager.java | 14 +- .../druid/query/lookup/LookupUtils.java | 96 +++++++++ .../LookupExtractorFactoryMapContainer.java | 6 +- 5 files changed, 314 insertions(+), 10 deletions(-) create mode 100644 extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java create mode 100644 server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java diff --git a/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java new file mode 100644 index 00000000000..7ef6ca501e5 --- /dev/null +++ b/extensions-core/lookups-cached-global/src/test/java/org/apache/druid/query/lookup/LookupUtilsTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.initialization.Initialization; +import org.apache.druid.server.DruidNode; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +public class LookupUtilsTest +{ + private static final TypeReference> LOOKUPS_ALL_GENERIC_REFERENCE = + new TypeReference>() + { + }; + + private static final TypeReference> LOOKUPS_ALL_REFERENCE = + new TypeReference>() + { + }; + + private static final String LOOKUP_VALID_INNER = " \"lookup_uri_good\": {\n" + + " \"version\": \"2021-12-03T01:04:05.151Z\",\n" + + " \"lookupExtractorFactory\": {\n" + + " \"type\": \"cachedNamespace\",\n" + + " \"extractionNamespace\": {\n" + + " \"type\": \"uri\",\n" + + " \"uri\": \"file:///home/lookup_data.json\",\n" + + " \"namespaceParseSpec\": {\n" + + " \"format\": \"simpleJson\"\n" + + " },\n" + + " \"pollPeriod\": \"PT30S\",\n" + + " \"maxHeapPercentage\": 1\n" + + " }\n" + + " }\n" + + " }"; + + + private static final String LOOKUP_VALID = "{\n" + + LOOKUP_VALID_INNER + "\n" + + "}"; + + private static final String LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER = + " \"lookup_keyColumn_but_no_valueColumn\": {\n" + + " \"version\": \"2021-12-03T02:17:01.983Z\",\n" + + " \"lookupExtractorFactory\": {\n" + + " \"type\": \"cachedNamespace\",\n" + + " \"extractionNamespace\": {\n" + + " \"type\": \"uri\",\n" + + " \"fileRegex\": \".*csv\",\n" + + " \"uriPrefix\": \"s3://bucket/path/\",\n" + + " \"namespaceParseSpec\": {\n" + + " \"format\": \"csv\",\n" + + " \"columns\": [\n" + + " \"cluster_id\",\n" + + " \"account_id\",\n" + + " \"manager_host\"\n" + + " ],\n" + + " \"keyColumn\": \"cluster_id\",\n" + + " \"hasHeaderRow\": true,\n" + + " \"skipHeaderRows\": 1\n" + + " },\n" + + " \"pollPeriod\": \"PT30S\"\n" + + " }\n" + + " }\n" + + " }"; + + private static final String LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN = + "{\n" + + LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER + "\n" + + "}"; + + private static final String LOOKSUPS_INVALID_AND_VALID = "{\n" + + LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER + ",\n" + + LOOKUP_VALID_INNER + "\n" + + "}"; + private ObjectMapper mapper; + + @Before + public void setup() + { + final Injector injector = makeInjector(); + mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + } + + @Test + public void test_tryConvertObjectMapToLookupConfigMap_allValid() throws IOException + { + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + Map validLookupExpected = mapper.readValue( + LOOKUP_VALID, + LOOKUPS_ALL_REFERENCE); + + Map validLookupGeneric = mapper.readValue( + LOOKUP_VALID, + LOOKUPS_ALL_GENERIC_REFERENCE); + Map actualLookup = + LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper); + + Assert.assertEquals(mapper.writeValueAsString(validLookupExpected), mapper.writeValueAsString(actualLookup)); + } + + @Test + public void test_tryConvertObjectMapToLookupConfigMap_allInvalid_emptyMap() + throws IOException + { + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + + Map validLookupGeneric = mapper.readValue( + LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN, + LOOKUPS_ALL_GENERIC_REFERENCE); + Map actualLookup = + LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper); + + Assert.assertTrue(actualLookup.isEmpty()); + } + + @Test + public void test_tryConvertObjectMapToLookupConfigMap_goodAndBadConfigs_skipsBad() + throws IOException + { + mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); + Map validLookupExpected = mapper.readValue( + LOOKUP_VALID, + LOOKUPS_ALL_REFERENCE); + + Map validLookupGeneric = mapper.readValue( + LOOKSUPS_INVALID_AND_VALID, + LOOKUPS_ALL_GENERIC_REFERENCE); + Map actualLookup = + LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper); + + Assert.assertEquals(mapper.writeValueAsString(validLookupExpected), mapper.writeValueAsString(actualLookup)); + } + + private Injector makeInjector() + { + return Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + new DruidNode("test-inject", null, false, null, null, true, false) + ); + } + } + ) + ); + } +} diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java index 3256e3e9d5e..01f7d684831 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningResource.java @@ -46,8 +46,8 @@ class LookupListeningResource extends ListenerResource { private static final Logger LOG = new Logger(LookupListeningResource.class); - private static final TypeReference> LOOKUPS_STATE_TYPE_REFERENCE = - new TypeReference>() + private static final TypeReference> LOOKUPS_STATE_GENERIC_REFERENCE = + new TypeReference>() { }; @@ -68,9 +68,22 @@ class LookupListeningResource extends ListenerResource @Override public Response handleUpdates(InputStream inputStream, ObjectMapper mapper) { + final LookupsState stateGeneric; final LookupsState state; + final Map current; + final Map toLoad; try { - state = mapper.readValue(inputStream, LOOKUPS_STATE_TYPE_REFERENCE); + stateGeneric = mapper.readValue(inputStream, LOOKUPS_STATE_GENERIC_REFERENCE); + current = LookupUtils.tryConvertObjectMapToLookupConfigMap( + stateGeneric.getCurrent(), + mapper + ); + toLoad = LookupUtils.tryConvertObjectMapToLookupConfigMap( + stateGeneric.getToLoad(), + mapper + ); + + state = new LookupsState<>(current, toLoad, stateGeneric.getToDrop()); } catch (final IOException ex) { LOG.debug(ex, "Bad request"); diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java index 8c5872e3ba7..0682a06f352 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java @@ -84,8 +84,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP { private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class); - private static final TypeReference> LOOKUPS_ALL_REFERENCE = - new TypeReference>() + private static final TypeReference> LOOKUPS_ALL_GENERIC_REFERENCE = + new TypeReference>() { }; @@ -429,7 +429,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP } @Nullable - private Map tryGetLookupListFromCoordinator(String tier) throws Exception + private Map tryGetLookupListFromCoordinator(String tier) + throws IOException, InterruptedException { final StringFullResponseHolder response = fetchLookupsForTier(tier); if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { @@ -454,7 +455,12 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP ); return null; } else { - return jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_REFERENCE); + Map lookupNameToGenericConfig = + jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_GENERIC_REFERENCE); + return LookupUtils.tryConvertObjectMapToLookupConfigMap( + lookupNameToGenericConfig, + jsonMapper + ); } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java b/server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java new file mode 100644 index 00000000000..919d9e41546 --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupUtils.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Map; + +/** + * Utility class for lookup related things + */ +public class LookupUtils +{ + + private static final EmittingLogger LOG = new EmittingLogger(LookupUtils.class); + + private LookupUtils() + { + + } + + /** + * Takes a map of String to Object, representing lookup name to generic lookup config, and attempts to construct + * a map from String to {@link LookupExtractorFactoryContainer}. Any lookup configs that are not able to be converted + * to {@link LookupExtractorFactoryContainer}, will be logged as warning, and will not be included in the map + * returned. + * + * @param lookupNameToGenericConfig The lookup generic config map. + * @param objectMapper The object mapper to use to convert bytes to {@link LookupExtractorFactoryContainer} + * @return + */ + public static Map tryConvertObjectMapToLookupConfigMap( + Map lookupNameToGenericConfig, + ObjectMapper objectMapper + ) + { + Map lookupNameToConfig = + Maps.newHashMapWithExpectedSize(lookupNameToGenericConfig.size()); + for (Map.Entry lookupNameAndConfig : lookupNameToGenericConfig.entrySet()) { + String lookupName = lookupNameAndConfig.getKey(); + LookupExtractorFactoryContainer lookupConfig = tryConvertObjectToLookupConfig( + lookupName, + lookupNameAndConfig.getValue(), + objectMapper + ); + if (lookupConfig != null) { + lookupNameToConfig.put(lookupName, lookupConfig); + } + + } + return lookupNameToConfig; + } + + @Nullable + private static LookupExtractorFactoryContainer tryConvertObjectToLookupConfig( + String lookupName, + Object o, + ObjectMapper objectMapper) + { + try { + byte[] lookupConfigBytes = objectMapper.writeValueAsBytes(o); + return objectMapper.readValue( + lookupConfigBytes, + LookupExtractorFactoryContainer.class + ); + } + catch (IOException e) { + LOG.warn("Lookup [%s] could not be serialized properly. Please check its configuration. Error: %s", + lookupName, + e.getMessage() + ); + } + return null; + } +} diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java index 04a5fd781b1..cabbb0b3d1a 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupExtractorFactoryMapContainer.java @@ -28,9 +28,9 @@ import java.util.Map; import java.util.Objects; /** - * This is same as LookupExtractorFactoryContainer except it uses Map instead of - * LookupExtractorFactory for referencing lookup spec so that lookup extensions are not required to - * be loaded at the Coordinator. + * This is same as {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainer } except it uses + * Map instead of LookupExtractorFactory for referencing lookup spec so that lookup extensions are not + * required to be loaded at the Coordinator. */ public class LookupExtractorFactoryMapContainer {