Add support for selective loading of lookups in the task layer (#16328)

Changes:
- Add `LookupLoadingSpec` to support 3 modes of lookup loading: ALL, NONE, ONLY_REQUIRED
- Add method `Task.getLookupLoadingSpec()`
- Do not load any lookups for `KillUnusedSegmentsTask`
This commit is contained in:
Akshat Jain 2024-04-29 07:19:59 +05:30 committed by GitHub
parent 9aef8e02ef
commit 9d2cae40c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 328 additions and 5 deletions

View File

@ -43,6 +43,7 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
@ -339,4 +340,10 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
);
return taskLockMap;
}
@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}
}

View File

@ -41,11 +41,13 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -331,4 +333,10 @@ public interface Task
taskInfo.getTask().getMetadata()
);
}
@Nullable
default LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.ALL;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
import org.hamcrest.MatcherAssert;
@ -423,6 +424,17 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
@Test
public void testGetLookupsToLoad()
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2019-03-01/2019-04-01"))
.markAsUnused(true)
.build();
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode());
}
@Test
public void testKillBatchSizeOneAndLimit4() throws Exception
{

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
class LookupListeningAnnouncerConfig
@ -57,4 +58,9 @@ class LookupListeningAnnouncerConfig
lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
);
}
public LookupLoadingSpec getLookupLoadingSpec()
{
return dataSourceTaskIdHolder.getLookupLoadingSpec();
}
}

View File

@ -45,6 +45,8 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -70,6 +72,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* This class provide a basic {@link LookupExtractorFactory} references manager. It allows basic operations fetching,
@ -167,7 +170,7 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
if (!Strings.isNullOrEmpty(lookupConfig.getSnapshotWorkingDir())) {
FileUtils.mkdirp(new File(lookupConfig.getSnapshotWorkingDir()));
}
loadAllLookupsAndInitStateRef();
loadLookupsAndInitStateRef();
if (!testMode) {
mainThread = Execs.makeThread(
"LookupExtractorFactoryContainerProvider-MainThread",
@ -373,10 +376,26 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
}
}
private void loadAllLookupsAndInitStateRef()
/**
* Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}.
*/
private void loadLookupsAndInitStateRef()
{
List<LookupBean> lookupBeanList = getLookupsList();
if (lookupBeanList != null) {
LookupLoadingSpec lookupLoadingSpec = lookupListeningAnnouncerConfig.getLookupLoadingSpec();
LOG.info("Loading lookups using spec[%s].", lookupLoadingSpec);
List<LookupBean> lookupBeanList;
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.NONE) {
lookupBeanList = Collections.emptyList();
} else {
lookupBeanList = getLookupsList();
if (lookupLoadingSpec.getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED && lookupBeanList != null) {
lookupBeanList = lookupBeanList.stream()
.filter(lookupBean -> lookupLoadingSpec.getLookupsToLoad().contains(lookupBean.getName()))
.collect(Collectors.toList());
}
}
if (lookupBeanList != null && !lookupBeanList.isEmpty()) {
startLookups(lookupBeanList);
} else {
LOG.debug("No lookups to be loaded at this point.");

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.lookup.cache;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;
import java.util.Set;
/**
* This class defines the spec for loading of lookups for a given task. It contains 2 fields:
* <ol>
* <li>{@link LookupLoadingSpec#mode}: This mode defines whether lookups need to be
* loaded for the given task, or not. It can take 3 values: </li>
* <ul>
* <li> ALL: Load all the lookups.</li>
* <li> NONE: Load no lookups. </li>
* <li> ONLY_REQUIRED: Load only the lookups defined in lookupsToLoad </li>
* </ul>
* <li>{@link LookupLoadingSpec#lookupsToLoad}: Defines the lookups to load when the lookupLoadingMode is set to ONLY_REQUIRED.</li>
* </ol>
*/
public class LookupLoadingSpec
{
public enum Mode
{
ALL, NONE, ONLY_REQUIRED
}
private final Mode mode;
private final ImmutableSet<String> lookupsToLoad;
public static final LookupLoadingSpec ALL = new LookupLoadingSpec(Mode.ALL, null);
public static final LookupLoadingSpec NONE = new LookupLoadingSpec(Mode.NONE, null);
private LookupLoadingSpec(Mode mode, Set<String> lookupsToLoad)
{
this.mode = mode;
this.lookupsToLoad = lookupsToLoad == null ? null : ImmutableSet.copyOf(lookupsToLoad);
}
/**
* Creates a LookupLoadingSpec which loads only the lookups present in the given set.
*/
public static LookupLoadingSpec loadOnly(Set<String> lookupsToLoad)
{
if (lookupsToLoad == null) {
throw InvalidInput.exception("Expected non-null set of lookups to load.");
}
return new LookupLoadingSpec(Mode.ONLY_REQUIRED, lookupsToLoad);
}
public Mode getMode()
{
return mode;
}
/**
* @return A non-null immutable set of lookup names when {@link LookupLoadingSpec#mode} is ONLY_REQUIRED, null otherwise.
*/
public ImmutableSet<String> getLookupsToLoad()
{
return lookupsToLoad;
}
@Override
public String toString()
{
return "LookupLoadingSpec{" +
"mode=" + mode +
", lookupsToLoad=" + lookupsToLoad +
'}';
}
}

View File

@ -21,11 +21,15 @@ package org.apache.druid.server.metrics;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import javax.annotation.Nullable;
public class DataSourceTaskIdHolder
{
public static final String DATA_SOURCE_BINDING = "druidDataSource";
public static final String TASK_ID_BINDING = "druidTaskId";
public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask";
@Named(DATA_SOURCE_BINDING)
@Inject(optional = true)
String dataSource = null;
@ -33,6 +37,11 @@ public class DataSourceTaskIdHolder
@Inject(optional = true)
String taskId = null;
@Nullable
@Named(LOOKUPS_TO_LOAD_FOR_TASK)
@Inject(optional = true)
LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL;
public String getDataSource()
{
return dataSource;
@ -42,4 +51,9 @@ public class DataSourceTaskIdHolder
{
return taskId;
}
public LookupLoadingSpec getLookupLoadingSpec()
{
return lookupLoadingSpec;
}
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
@ -31,11 +32,14 @@ import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class LookupListeningAnnouncerConfigTest
@ -57,6 +61,11 @@ public class LookupListeningAnnouncerConfigTest
binder
.bind(Key.get(String.class, Names.named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)))
.toInstance("some_datasource");
final List<String> lookupsToLoad = Arrays.asList("lookupName1", "lookupName2");
binder.bind(new TypeLiteral<List<String>>() {})
.annotatedWith(Names.named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK))
.toInstance(lookupsToLoad);
}
},
new LookupModule()
@ -127,6 +136,14 @@ public class LookupListeningAnnouncerConfigTest
Assert.assertEquals("some_datasource", config.getLookupTier());
}
@Test
public void testLookupsToLoadInjection()
{
final DataSourceTaskIdHolder dimensionIdHolder = new DataSourceTaskIdHolder();
injector.injectMembers(dimensionIdHolder);
Assert.assertEquals(LookupLoadingSpec.Mode.ALL, dimensionIdHolder.getLookupLoadingSpec().getMode());
}
@Test(expected = IllegalArgumentException.class)
public void testFailsInjection()
{

View File

@ -27,6 +27,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
@ -51,6 +52,7 @@ import java.util.concurrent.TimeoutException;
public class LookupReferencesManagerTest
{
private static final String LOOKUP_TIER = "lookupTier";
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
LookupReferencesManager lookupReferencesManager;
@ -68,6 +70,7 @@ public class LookupReferencesManagerTest
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
config = EasyMock.createMock(LookupListeningAnnouncerConfig.class);
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes();
lookupExtractorFactory = new MapLookupExtractorFactory(
ImmutableMap.of(
@ -765,6 +768,80 @@ public class LookupReferencesManagerTest
}
private Map<String, LookupExtractorFactoryContainer> getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec lookupLoadingSpec)
throws Exception
{
LookupExtractorFactoryContainer container1 = new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key1", "value1"), true)
);
LookupExtractorFactoryContainer container2 = new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key2", "value2"), true
)
);
LookupExtractorFactoryContainer container3 = new LookupExtractorFactoryContainer(
"0",
new MapLookupExtractorFactory(ImmutableMap.of("key3", "value3"), true
)
);
EasyMock.reset(config);
EasyMock.reset(druidLeaderClient);
Map<String, LookupExtractorFactoryContainer> lookupMap = new HashMap<>();
lookupMap.put("testLookup1", container1);
lookupMap.put("testLookup2", container2);
lookupMap.put("testLookup3", container3);
String strResult = mapper.writeValueAsString(lookupMap);
Request request = new Request(HttpMethod.GET, new URL("http://localhost:1234/xx"));
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(lookupLoadingSpec);
EasyMock.replay(config);
EasyMock.expect(druidLeaderClient.makeRequest(
HttpMethod.GET,
"/druid/coordinator/v1/lookups/config/lookupTier?detailed=true"
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
EasyMock.replay(druidLeaderClient);
lookupReferencesManager.start();
return lookupMap;
}
@Test
public void testCoordinatorLoadAllLookups() throws Exception
{
Map<String, LookupExtractorFactoryContainer> lookupMap = getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.ALL);
for (String lookupName : lookupMap.keySet()) {
Assert.assertEquals(Optional.of(lookupMap.get(lookupName)), lookupReferencesManager.get(lookupName));
}
}
@Test
public void testCoordinatorLoadNoLookups() throws Exception
{
Map<String, LookupExtractorFactoryContainer> lookupMap = getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.NONE);
for (String lookupName : lookupMap.keySet()) {
Assert.assertFalse(lookupReferencesManager.get(lookupName).isPresent());
}
}
@Test
public void testCoordinatorLoadSubsetOfLookups() throws Exception
{
Map<String, LookupExtractorFactoryContainer> lookupMap =
getLookupMapForSelectiveLoadingOfLookups(LookupLoadingSpec.loadOnly(ImmutableSet.of("testLookup1", "testLookup2")));
Assert.assertEquals(Optional.of(lookupMap.get("testLookup1")), lookupReferencesManager.get("testLookup1"));
Assert.assertEquals(Optional.of(lookupMap.get("testLookup2")), lookupReferencesManager.get("testLookup2"));
Assert.assertFalse(lookupReferencesManager.get("testLookup3").isPresent());
}
@Test
public void testLoadLookupOnCoordinatorFailure() throws Exception
{
@ -818,6 +895,7 @@ public class LookupReferencesManagerTest
EasyMock.reset(config);
EasyMock.reset(druidLeaderClient);
EasyMock.expect(config.getLookupTier()).andReturn(LOOKUP_TIER).anyTimes();
EasyMock.expect(config.getLookupLoadingSpec()).andReturn(LookupLoadingSpec.ALL).anyTimes();
EasyMock.replay(config);
EasyMock.expect(druidLeaderClient.makeRequest(
HttpMethod.GET,

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.lookup.cache;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.junit.Assert;
import org.junit.Test;
import java.util.Set;
public class LookupLoadingSpecTest
{
@Test
public void testLoadingAllLookups()
{
LookupLoadingSpec spec = LookupLoadingSpec.ALL;
Assert.assertEquals(LookupLoadingSpec.Mode.ALL, spec.getMode());
Assert.assertNull(spec.getLookupsToLoad());
}
@Test
public void testLoadingNoLookups()
{
LookupLoadingSpec spec = LookupLoadingSpec.NONE;
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, spec.getMode());
Assert.assertNull(spec.getLookupsToLoad());
}
@Test
public void testLoadingOnlyRequiredLookups()
{
Set<String> lookupsToLoad = ImmutableSet.of("lookupName1", "lookupName2");
LookupLoadingSpec spec = LookupLoadingSpec.loadOnly(lookupsToLoad);
Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, spec.getMode());
Assert.assertEquals(lookupsToLoad, spec.getLookupsToLoad());
}
@Test
public void testLoadingOnlyRequiredLookupsWithNullList()
{
DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.loadOnly(null));
Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage());
}
}

View File

@ -131,6 +131,7 @@ import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.tasklogs.TaskPayloadManager;
@ -332,6 +333,14 @@ public class CliPeon extends GuiceRunnable
{
return task.getId();
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.LOOKUPS_TO_LOAD_FOR_TASK)
public LookupLoadingSpec getLookupsToLoad(final Task task)
{
return task.getLookupLoadingSpec();
}
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),

View File

@ -42,7 +42,7 @@ import java.util.Set;
* Walks an {@link SqlNode} to collect a set of {@link Resource} for {@link ResourceType#DATASOURCE} and
* {@link ResourceType#VIEW} to use for authorization during query planning.
*
* It works by looking for {@link SqlIdentifier} which corespond to a {@link IdentifierNamespace}, where
* It works by looking for {@link SqlIdentifier} which correspond to a {@link IdentifierNamespace}, where
* {@link SqlValidatorNamespace} is calcite-speak for sources of data and {@link IdentifierNamespace} specifically are
* namespaces which are identified by a single variable, e.g. table names.
*/