From 3e172d44abef43a25409260b51a797a1cef5f958 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 11 Nov 2022 21:56:33 +0530 Subject: [PATCH] Bind DurableStorageCleaner only on the Overlord nodes (#13355) --- .../msq/guice/MSQDurableStorageModule.java | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index df1627ac498..368144091fc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -23,10 +23,14 @@ import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Inject; +import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; +import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.overlord.helpers.OverlordHelper; import org.apache.druid.initialization.DruidModule; import org.apache.druid.msq.indexing.DurableStorageCleaner; @@ -34,8 +38,10 @@ import org.apache.druid.msq.indexing.DurableStorageCleanerConfig; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorProvider; +import javax.annotation.Nullable; import java.util.List; import java.util.Properties; +import java.util.Set; /** * Module for functionality related to durable storage for stage output data. @@ -49,6 +55,7 @@ public class MSQDurableStorageModule implements DruidModule String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "enable"); private Properties properties; + private Injector injector; @Override public List getJacksonModules() @@ -71,15 +78,18 @@ public class MSQDurableStorageModule implements DruidModule .toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)) .in(LazySingleton.class); - Multibinder.newSetBinder(binder, OverlordHelper.class) - .addBinding() - .to(DurableStorageCleaner.class); + Set nodeRoles = getNodeRoles(injector); + if (nodeRoles != null && nodeRoles.contains(NodeRole.OVERLORD)) { + JsonConfigProvider.bind( + binder, + String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"), + DurableStorageCleanerConfig.class + ); - JsonConfigProvider.bind( - binder, - String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"), - DurableStorageCleanerConfig.class - ); + Multibinder.newSetBinder(binder, OverlordHelper.class) + .addBinding() + .to(DurableStorageCleaner.class); + } } } @@ -89,6 +99,31 @@ public class MSQDurableStorageModule implements DruidModule this.properties = properties; } + @Inject + public void setInjector(Injector injector) + { + this.injector = injector; + } + + + @Nullable + private static Set getNodeRoles(Injector injector) + { + try { + return injector.getInstance( + Key.get( + new TypeLiteral>() + { + }, + Self.class + ) + ); + } + catch (Exception e) { + return null; + } + } + private boolean isDurableShuffleStorageEnabled() { return Boolean.parseBoolean((String) properties.getOrDefault(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false"));