move disco

This commit is contained in:
Zoltan Haindrich 2024-05-16 09:50:10 +00:00
parent cab3d945be
commit 27735f2621
3 changed files with 289 additions and 134 deletions

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.quidem;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.discovery.DruidNodeDiscovery.Listener;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache;
import org.apache.druid.sql.calcite.util.CalciteTests;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BooleanSupplier;
public class DiscovertModule extends AbstractModule {
DiscovertModule() {
}
@Override
protected void configure()
{
// builder.addModule(propOverrideModuel());
}
@Provides
@LazySingleton
public BrokerSegmentMetadataCache provideCache() {
return null;
}
@Provides
@LazySingleton
public Properties getProps() {
Properties localProps = new Properties();
localProps.put("druid.enableTlsPort", "false");
localProps.put("druid.zk.service.enabled", "false");
localProps.put("druid.plaintextPort", "12345");
localProps.put("druid.host", "localhost");
return localProps;
}
@Provides
@LazySingleton
public SqlEngine createMockSqlEngine(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate,
@Json ObjectMapper jsonMapper )
{
return new NativeSqlEngine(CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), jsonMapper);
}
@Provides
@LazySingleton
DruidNodeDiscoveryProvider getProvider() {
final DruidNode coordinatorNode = new DruidNode("test-coordinator", "dummy", false, 8081, null, true, false);
DiscovertModule.FakeDruidNodeDiscoveryProvider provider = new FakeDruidNodeDiscoveryProvider(
ImmutableMap.of(
NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDINATOR, coordinatorNode))
)
);
return provider;
}
/**
* A fake {@link DruidNodeDiscoveryProvider} for {@link #createMockSystemSchema}.
*/
private static class FakeDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private final Map<NodeRole, DiscovertModule.FakeDruidNodeDiscovery> nodeDiscoveries;
public FakeDruidNodeDiscoveryProvider(Map<NodeRole, DiscovertModule.FakeDruidNodeDiscovery> nodeDiscoveries)
{
this.nodeDiscoveries = nodeDiscoveries;
}
@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
boolean get = nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery())
.getAllNodes()
.stream()
.anyMatch(x -> x.getDruidNode().equals(node));
return () -> get;
}
@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery());
}
}
private static class FakeDruidNodeDiscovery implements DruidNodeDiscovery
{
private final Set<DiscoveryDruidNode> nodes;
FakeDruidNodeDiscovery()
{
this.nodes = new HashSet<>();
}
FakeDruidNodeDiscovery(Map<NodeRole, DruidNode> nodes)
{
this.nodes = Sets.newHashSetWithExpectedSize(nodes.size());
nodes.forEach((k, v) -> {
addNode(v, k);
});
}
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return nodes;
}
void addNode(DruidNode node, NodeRole role)
{
final DiscoveryDruidNode discoveryNode = new DiscoveryDruidNode(node, role, ImmutableMap.of());
this.nodes.add(discoveryNode);
}
@Override
public void registerListener(Listener listener)
{
}
}
}

View File

@ -24,8 +24,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@ -38,10 +36,6 @@ import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
import org.apache.druid.cli.CliBroker2;
import org.apache.druid.curator.CuratorModule;
import org.apache.druid.curator.discovery.DiscoveryModule;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.BrokerProcessingModule;
import org.apache.druid.guice.BrokerServiceModule;
@ -67,7 +61,6 @@ import org.apache.druid.guice.StartupLoggingModule;
import org.apache.druid.guice.StorageNodeModule;
import org.apache.druid.guice.annotations.Client;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.http.HttpClientModule;
import org.apache.druid.guice.security.AuthenticatorModule;
import org.apache.druid.guice.security.AuthorizerModule;
@ -83,7 +76,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.rpc.guice.ServiceClientModule;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
@ -112,9 +104,7 @@ import org.apache.druid.sql.calcite.SqlTestFrameworkConfig.SqlTestFrameworkConfi
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule;
@ -141,14 +131,10 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BooleanSupplier;
public class Launcher
{
@ -457,126 +443,6 @@ public class Launcher
return m;
}
public static class DiscovertModule extends AbstractModule {
DiscovertModule() {
}
@Override
protected void configure()
{
// builder.addModule(propOverrideModuel());
}
@Provides
@LazySingleton
public BrokerSegmentMetadataCache provideCache() {
return null;
}
@Provides
@LazySingleton
public Properties getProps() {
Properties localProps = new Properties();
localProps.put("druid.enableTlsPort", "false");
localProps.put("druid.zk.service.enabled", "false");
localProps.put("druid.plaintextPort", "12345");
localProps.put("druid.host", "localhost");
return localProps;
}
@Provides
@LazySingleton
public SqlEngine createMockSqlEngine(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate,
@Json ObjectMapper jsonMapper )
{
return new NativeSqlEngine(CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), jsonMapper);
}
@Provides
@LazySingleton
DruidNodeDiscoveryProvider getProvider() {
final DruidNode coordinatorNode = new DruidNode("test-coordinator", "dummy", false, 8081, null, true, false);
FakeDruidNodeDiscoveryProvider provider = new FakeDruidNodeDiscoveryProvider(
ImmutableMap.of(
NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDINATOR, coordinatorNode))
)
);
return provider;
}
/**
* A fake {@link DruidNodeDiscoveryProvider} for {@link #createMockSystemSchema}.
*/
private static class FakeDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private final Map<NodeRole, FakeDruidNodeDiscovery> nodeDiscoveries;
public FakeDruidNodeDiscoveryProvider(Map<NodeRole, FakeDruidNodeDiscovery> nodeDiscoveries)
{
this.nodeDiscoveries = nodeDiscoveries;
}
@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
boolean get = nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery())
.getAllNodes()
.stream()
.anyMatch(x -> x.getDruidNode().equals(node));
return () -> get;
}
@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return nodeDiscoveries.getOrDefault(nodeRole, new FakeDruidNodeDiscovery());
}
}
private static class FakeDruidNodeDiscovery implements DruidNodeDiscovery
{
private final Set<DiscoveryDruidNode> nodes;
FakeDruidNodeDiscovery()
{
this.nodes = new HashSet<>();
}
FakeDruidNodeDiscovery(Map<NodeRole, DruidNode> nodes)
{
this.nodes = Sets.newHashSetWithExpectedSize(nodes.size());
nodes.forEach((k, v) -> {
addNode(v, k);
});
}
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return nodes;
}
void addNode(DruidNode node, NodeRole role)
{
final DiscoveryDruidNode discoveryNode = new DiscoveryDruidNode(node, role, ImmutableMap.of());
this.nodes.add(discoveryNode);
}
@Override
public void registerListener(Listener listener)
{
}
}
}
static class CustomStartupInjectorBuilder extends StartupInjectorBuilder {
private List<com.google.inject.Module> overrideModules =new ArrayList<>();
@ -853,6 +719,7 @@ public class Launcher
// ret.add(new AvaticaBasedConnectionModule());
ret.add(binder -> binder.bind(RequestLogger.class).toInstance(new TestRequestLogger()));
ret.add(CacheTestHelperModule.ResultCacheMode.DISABLED.makeModule());
// ret.add(CacheTestHelperModule2.ResultCacheMode.DISABLED.makeModule());
ret.add(new QuidemCaptureModule());
ret.addAll(super.getModules());
return ret;

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.util;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.server.EtagProvider;
public class CacheTestHelperModule2 extends AbstractModule
{
public enum ResultCacheMode
{
DISABLED,
ENABLED;
public Module makeModule()
{
return new CacheTestHelperModule2(this);
}
public boolean isPopulateResultLevelCache()
{
return this != DISABLED;
}
public boolean isUseResultLevelCache()
{
return this != DISABLED;
}
}
protected final Cache cache;
private CacheConfig cacheConfig;
private EtagProvider etagProvider;
static class TestCacheConfig extends CacheConfig
{
private ResultCacheMode resultLevelCache;
public TestCacheConfig(ResultCacheMode resultCacheMode)
{
this.resultLevelCache = resultCacheMode;
}
@Override
public boolean isPopulateResultLevelCache()
{
return resultLevelCache.isPopulateResultLevelCache();
}
@Override
public boolean isUseResultLevelCache()
{
return resultLevelCache.isUseResultLevelCache();
}
}
public CacheTestHelperModule2(ResultCacheMode resultCacheMode)
{
cacheConfig = new TestCacheConfig(resultCacheMode);
switch (resultCacheMode) {
case ENABLED:
etagProvider = new EtagProvider.ProvideEtagBasedOnDatasource();
cache = MapCache.create(1_000_000L);
break;
case DISABLED:
etagProvider = new EtagProvider.EmptyEtagProvider();
cache = null;
break;
default:
throw new RuntimeException();
}
}
@Provides
EtagProvider etagProvider()
{
return etagProvider;
}
@Provides
CacheConfig getCacheConfig()
{
return cacheConfig;
}
@Provides
Cache getCache()
{
return cache;
}
@Override
public void configure()
{
}
}