Bind DurableStorageCleaner only on the Overlord nodes (#13355)

This commit is contained in:
Laksh Singla 2022-11-11 21:56:33 +05:30 committed by GitHub
parent 47dd4ed2e7
commit 3e172d44ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,10 +23,14 @@ import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.msq.indexing.DurableStorageCleaner;
@ -34,8 +38,10 @@ import org.apache.druid.msq.indexing.DurableStorageCleanerConfig;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Module for functionality related to durable storage for stage output data.
@ -49,6 +55,7 @@ public class MSQDurableStorageModule implements DruidModule
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "enable");
private Properties properties;
private Injector injector;
@Override
public List<? extends Module> getJacksonModules()
@ -71,15 +78,18 @@ public class MSQDurableStorageModule implements DruidModule
.toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))
.in(LazySingleton.class);
Multibinder.newSetBinder(binder, OverlordHelper.class)
.addBinding()
.to(DurableStorageCleaner.class);
Set<NodeRole> nodeRoles = getNodeRoles(injector);
if (nodeRoles != null && nodeRoles.contains(NodeRole.OVERLORD)) {
JsonConfigProvider.bind(
binder,
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"),
DurableStorageCleanerConfig.class
);
JsonConfigProvider.bind(
binder,
String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"),
DurableStorageCleanerConfig.class
);
Multibinder.newSetBinder(binder, OverlordHelper.class)
.addBinding()
.to(DurableStorageCleaner.class);
}
}
}
@ -89,6 +99,31 @@ public class MSQDurableStorageModule implements DruidModule
this.properties = properties;
}
@Inject
public void setInjector(Injector injector)
{
this.injector = injector;
}
@Nullable
private static Set<NodeRole> getNodeRoles(Injector injector)
{
try {
return injector.getInstance(
Key.get(
new TypeLiteral<Set<NodeRole>>()
{
},
Self.class
)
);
}
catch (Exception e) {
return null;
}
}
private boolean isDurableShuffleStorageEnabled()
{
return Boolean.parseBoolean((String) properties.getOrDefault(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false"));