mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-16 18:04:52 +00:00
Limit AllocationService dependency injection hack (#24479)
Changes the scope of the AllocationService dependency injection hack so that it is at least contained to the AllocationService and does not leak into the Discovery world.
This commit is contained in:
parent
035494fa17
commit
c8712e9531
@ -36,8 +36,6 @@ import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -49,7 +47,7 @@ public final class Allocators {
|
||||
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
|
||||
|
||||
protected NoopGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,9 +58,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
@ -92,19 +90,22 @@ public class ClusterModule extends AbstractModule {
|
||||
public static final Setting<String> SHARDS_ALLOCATOR_TYPE_SETTING =
|
||||
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
|
||||
|
||||
private final Settings settings;
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final AllocationDeciders allocationDeciders;
|
||||
private final AllocationService allocationService;
|
||||
// pkg private for tests
|
||||
final Collection<AllocationDecider> allocationDeciders;
|
||||
final Collection<AllocationDecider> deciderList;
|
||||
final ShardsAllocator shardsAllocator;
|
||||
|
||||
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins) {
|
||||
this.settings = settings;
|
||||
this.allocationDeciders = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
|
||||
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
|
||||
ClusterInfoService clusterInfoService) {
|
||||
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
|
||||
this.allocationDeciders = new AllocationDeciders(settings, deciderList);
|
||||
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
|
||||
this.clusterService = clusterService;
|
||||
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
|
||||
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
|
||||
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
|
||||
}
|
||||
|
||||
|
||||
@ -213,10 +214,14 @@ public class ClusterModule extends AbstractModule {
|
||||
"ShardsAllocator factory for [" + allocatorName + "] returned null");
|
||||
}
|
||||
|
||||
public AllocationService getAllocationService() {
|
||||
return allocationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(GatewayAllocator.class).asEagerSingleton();
|
||||
bind(AllocationService.class).asEagerSingleton();
|
||||
bind(AllocationService.class).toInstance(allocationService);
|
||||
bind(ClusterService.class).toInstance(clusterService);
|
||||
bind(NodeConnectionsService.class).asEagerSingleton();
|
||||
bind(MetaDataCreateIndexService.class).asEagerSingleton();
|
||||
@ -233,7 +238,7 @@ public class ClusterModule extends AbstractModule {
|
||||
bind(NodeMappingRefreshAction.class).asEagerSingleton();
|
||||
bind(MappingUpdatedAction.class).asEagerSingleton();
|
||||
bind(TaskResultsService.class).asEagerSingleton();
|
||||
bind(AllocationDeciders.class).toInstance(new AllocationDeciders(settings, allocationDeciders));
|
||||
bind(AllocationDeciders.class).toInstance(allocationDeciders);
|
||||
bind(ShardsAllocator.class).toInstance(shardsAllocator);
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,6 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
|
||||
@ -61,20 +60,29 @@ import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NOD
|
||||
public class AllocationService extends AbstractComponent {
|
||||
|
||||
private final AllocationDeciders allocationDeciders;
|
||||
private final GatewayAllocator gatewayAllocator;
|
||||
private GatewayAllocator gatewayAllocator;
|
||||
private final ShardsAllocator shardsAllocator;
|
||||
private final ClusterInfoService clusterInfoService;
|
||||
|
||||
@Inject
|
||||
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
|
||||
public AllocationService(Settings settings, AllocationDeciders allocationDeciders,
|
||||
GatewayAllocator gatewayAllocator,
|
||||
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
|
||||
this(settings, allocationDeciders, shardsAllocator, clusterInfoService);
|
||||
setGatewayAllocator(gatewayAllocator);
|
||||
}
|
||||
|
||||
public AllocationService(Settings settings, AllocationDeciders allocationDeciders,
|
||||
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
|
||||
super(settings);
|
||||
this.allocationDeciders = allocationDeciders;
|
||||
this.gatewayAllocator = gatewayAllocator;
|
||||
this.shardsAllocator = shardsAllocator;
|
||||
this.clusterInfoService = clusterInfoService;
|
||||
}
|
||||
|
||||
public void setGatewayAllocator(GatewayAllocator gatewayAllocator) {
|
||||
this.gatewayAllocator = gatewayAllocator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the started shards. Note, only initializing ShardRouting instances that exist in the routing table should be
|
||||
* provided as parameter and no duplicates should be contained.
|
||||
|
@ -23,7 +23,6 @@ import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
@ -37,12 +36,6 @@ import java.io.IOException;
|
||||
*/
|
||||
public interface Discovery extends LifecycleComponent {
|
||||
|
||||
/**
|
||||
* Another hack to solve dep injection problem..., note, this will be called before
|
||||
* any start is called.
|
||||
*/
|
||||
void setAllocationService(AllocationService allocationService);
|
||||
|
||||
/**
|
||||
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
|
||||
* process should apply this state to the master as well!
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -58,7 +59,8 @@ public class DiscoveryModule {
|
||||
|
||||
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
|
||||
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins) {
|
||||
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
|
||||
AllocationService allocationService) {
|
||||
final UnicastHostsProvider hostsProvider;
|
||||
|
||||
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
@ -83,12 +85,12 @@ public class DiscoveryModule {
|
||||
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
|
||||
discoveryTypes.put("zen",
|
||||
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||
clusterSettings, hostsProvider));
|
||||
clusterSettings, hostsProvider, allocationService));
|
||||
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, clusterApplier));
|
||||
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, clusterApplier));
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
|
||||
masterService, clusterApplier, clusterSettings, hostsProvider).entrySet().forEach(entry -> {
|
||||
masterService, clusterApplier, clusterSettings, hostsProvider, allocationService).entrySet().forEach(entry -> {
|
||||
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
|
||||
}
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -59,11 +58,6 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
|
||||
this.clusterApplier = clusterApplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(final AllocationService allocationService) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void publish(final ClusterChangedEvent event,
|
||||
final AckListener ackListener) {
|
||||
|
@ -109,7 +109,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||
|
||||
private final TransportService transportService;
|
||||
private final MasterService masterService;
|
||||
private AllocationService allocationService;
|
||||
private final ClusterName clusterName;
|
||||
private final DiscoverySettings discoverySettings;
|
||||
protected final ZenPing zenPing; // protected to allow tests access
|
||||
@ -140,9 +139,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||
|
||||
private final JoinThreadControl joinThreadControl;
|
||||
|
||||
// must initialized in doStart(), when we have the allocationService set
|
||||
private volatile NodeJoinController nodeJoinController;
|
||||
private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||
private final NodeJoinController nodeJoinController;
|
||||
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||
|
||||
private final ClusterApplier clusterApplier;
|
||||
private final AtomicReference<ClusterState> state; // last committed cluster state
|
||||
@ -151,7 +149,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||
|
||||
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) {
|
||||
super(settings);
|
||||
this.masterService = masterService;
|
||||
this.clusterApplier = clusterApplier;
|
||||
@ -213,6 +211,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
|
||||
this.joinThreadControl = new JoinThreadControl();
|
||||
|
||||
this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
|
||||
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
|
||||
|
||||
transportService.registerRequestHandler(
|
||||
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
|
||||
}
|
||||
@ -223,11 +224,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
this.allocationService = allocationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
DiscoveryNode localNode = transportService.getLocalNode();
|
||||
@ -239,8 +235,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||
joinThreadControl.start();
|
||||
}
|
||||
zenPing.start(this);
|
||||
this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
|
||||
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class GatewayAllocator extends AbstractComponent {
|
||||
|
||||
private RoutingService routingService;
|
||||
private final RoutingService routingService;
|
||||
|
||||
private final PrimaryShardAllocator primaryShardAllocator;
|
||||
private final ReplicaShardAllocator replicaShardAllocator;
|
||||
@ -52,14 +52,12 @@ public class GatewayAllocator extends AbstractComponent {
|
||||
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>> asyncFetchStore = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
@Inject
|
||||
public GatewayAllocator(Settings settings, final TransportNodesListGatewayStartedShards startedAction, final TransportNodesListShardStoreMetaData storeAction) {
|
||||
public GatewayAllocator(Settings settings, ClusterService clusterService, RoutingService routingService,
|
||||
TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) {
|
||||
super(settings);
|
||||
this.routingService = routingService;
|
||||
this.primaryShardAllocator = new InternalPrimaryShardAllocator(settings, startedAction);
|
||||
this.replicaShardAllocator = new InternalReplicaShardAllocator(settings, storeAction);
|
||||
}
|
||||
|
||||
public void setReallocation(final ClusterService clusterService, final RoutingService routingService) {
|
||||
this.routingService = routingService;
|
||||
clusterService.addStateApplier(event -> {
|
||||
boolean cleanCache = false;
|
||||
DiscoveryNode localNode = event.state().nodes().getLocalNode();
|
||||
@ -79,6 +77,14 @@ public class GatewayAllocator extends AbstractComponent {
|
||||
});
|
||||
}
|
||||
|
||||
// for tests
|
||||
protected GatewayAllocator(Settings settings) {
|
||||
super(settings);
|
||||
this.routingService = null;
|
||||
this.primaryShardAllocator = null;
|
||||
this.replicaShardAllocator = null;
|
||||
}
|
||||
|
||||
public int getNumberOfInFlightFetch() {
|
||||
int count = 0;
|
||||
for (AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch : asyncFetchStarted.values()) {
|
||||
|
@ -49,7 +49,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
@ -352,7 +351,7 @@ public class Node implements Closeable {
|
||||
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
|
||||
modules.add(new NodeModule(this, monitorService));
|
||||
ClusterModule clusterModule = new ClusterModule(settings, clusterService,
|
||||
pluginsService.filterPlugins(ClusterPlugin.class));
|
||||
pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService);
|
||||
modules.add(clusterModule);
|
||||
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
|
||||
modules.add(indicesModule);
|
||||
@ -437,7 +436,8 @@ public class Node implements Closeable {
|
||||
|
||||
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,
|
||||
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
|
||||
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class));
|
||||
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
|
||||
clusterModule.getAllocationService());
|
||||
NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
|
||||
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
|
||||
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
|
||||
@ -488,6 +488,9 @@ public class Node implements Closeable {
|
||||
);
|
||||
injector = modules.createInjector();
|
||||
|
||||
// TODO hack around circular dependencies problems in AllocationService
|
||||
clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));
|
||||
|
||||
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
|
||||
.filter(p -> p instanceof LifecycleComponent)
|
||||
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
|
||||
@ -644,8 +647,6 @@ public class Node implements Closeable {
|
||||
|
||||
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
|
||||
logger.info("starting ...");
|
||||
// hack around dependency injection problem (for now...)
|
||||
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
|
||||
pluginLifecycleComponents.forEach(LifecycleComponent::start);
|
||||
|
||||
injector.getInstance(MappingUpdatedAction.class).setClient(client);
|
||||
@ -663,9 +664,6 @@ public class Node implements Closeable {
|
||||
nodeConnectionsService.start();
|
||||
clusterService.setNodeConnectionsService(nodeConnectionsService);
|
||||
|
||||
// TODO hack around circular dependencies problems
|
||||
injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));
|
||||
|
||||
injector.getInstance(ResourceWatcherService.class).start();
|
||||
injector.getInstance(GatewayService.class).start();
|
||||
Discovery discovery = injector.getInstance(Discovery.class);
|
||||
|
@ -23,6 +23,7 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -68,7 +69,8 @@ public interface DiscoveryPlugin {
|
||||
MasterService masterService,
|
||||
ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings,
|
||||
UnicastHostsProvider hostsProvider) {
|
||||
UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,7 @@ import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class ClusterModuleTests extends ModuleTestCase {
|
||||
private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
|
||||
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
|
||||
static class FakeAllocationDecider extends AllocationDecider {
|
||||
@ -114,7 +115,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
|
||||
return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings));
|
||||
}
|
||||
})));
|
||||
}), clusterInfoService));
|
||||
assertEquals(e.getMessage(),
|
||||
"Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice");
|
||||
}
|
||||
@ -126,8 +127,8 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
|
||||
return Collections.singletonList(new FakeAllocationDecider(settings));
|
||||
}
|
||||
}));
|
||||
assertTrue(module.allocationDeciders.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class)));
|
||||
}), clusterInfoService);
|
||||
assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class)));
|
||||
}
|
||||
|
||||
private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, String name, Supplier<ShardsAllocator> supplier) {
|
||||
@ -138,7 +139,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||
return Collections.singletonMap(name, supplier);
|
||||
}
|
||||
}
|
||||
));
|
||||
), clusterInfoService);
|
||||
}
|
||||
|
||||
public void testRegisterShardsAllocator() {
|
||||
@ -156,7 +157,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
||||
public void testUnknownShardsAllocator() {
|
||||
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build();
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||
new ClusterModule(settings, clusterService, Collections.emptyList()));
|
||||
new ClusterModule(settings, clusterService, Collections.emptyList(), clusterInfoService));
|
||||
assertEquals("Unknown ShardsAllocator [dne]", e.getMessage());
|
||||
}
|
||||
|
||||
|
@ -391,7 +391,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
||||
private class NoopGatewayAllocator extends GatewayAllocator {
|
||||
|
||||
NoopGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -20,6 +20,7 @@ package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -71,7 +72,8 @@ public class DiscoveryModuleTests extends ESTestCase {
|
||||
default Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
MasterService masterService, ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
return impl();
|
||||
}
|
||||
}
|
||||
@ -93,7 +95,7 @@ public class DiscoveryModuleTests extends ESTestCase {
|
||||
|
||||
private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
|
||||
return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService,
|
||||
clusterApplier, clusterSettings, plugins);
|
||||
clusterApplier, clusterSettings, plugins, null);
|
||||
}
|
||||
|
||||
public void testDefaults() {
|
||||
|
@ -299,7 +299,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
||||
masterService, (source, clusterStateSupplier, listener) -> listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get()),
|
||||
clusterSettings, Collections::emptyList);
|
||||
clusterSettings, Collections::emptyList, null);
|
||||
zenDiscovery.start();
|
||||
return zenDiscovery;
|
||||
}
|
||||
|
@ -28,11 +28,6 @@ import org.elasticsearch.discovery.DiscoveryStats;
|
||||
|
||||
public class NoopDiscovery implements Discovery {
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
|
||||
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.plugin.discovery.azure.classic;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
|
||||
import org.elasticsearch.cloud.azure.classic.management.AzureComputeServiceImpl;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -76,11 +77,12 @@ public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
MasterService masterService, ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
|
||||
return Collections.singletonMap(AZURE, () ->
|
||||
new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||
clusterSettings, hostsProvider));
|
||||
clusterSettings, hostsProvider, allocationService));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -98,11 +99,12 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
MasterService masterService, ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
|
||||
return Collections.singletonMap(EC2, () ->
|
||||
new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||
clusterSettings, hostsProvider));
|
||||
clusterSettings, hostsProvider, allocationService));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
|
||||
import org.elasticsearch.cloud.gce.GceMetadataService;
|
||||
import org.elasticsearch.cloud.gce.network.GceNameResolver;
|
||||
import org.elasticsearch.cloud.gce.util.Access;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
@ -86,11 +87,12 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
MasterService masterService, ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
|
||||
return Collections.singletonMap(GCE, () ->
|
||||
new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||
clusterSettings, hostsProvider));
|
||||
clusterSettings, hostsProvider, allocationService));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -205,7 +205,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
||||
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
|
||||
|
||||
public DelayedShardsMockGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -59,10 +60,11 @@ public class TestZenDiscovery extends ZenDiscovery {
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
MasterService masterService, ClusterApplier clusterApplier,
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
return Collections.singletonMap("test-zen",
|
||||
() -> new TestZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService,
|
||||
clusterApplier, clusterSettings, hostsProvider));
|
||||
clusterApplier, clusterSettings, hostsProvider, allocationService));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -78,9 +80,10 @@ public class TestZenDiscovery extends ZenDiscovery {
|
||||
|
||||
private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService,
|
||||
ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider) {
|
||||
ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
|
||||
AllocationService allocationService) {
|
||||
super(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
|
||||
hostsProvider);
|
||||
hostsProvider, allocationService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -35,7 +35,7 @@ public class NoopGatewayAllocator extends GatewayAllocator {
|
||||
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
|
||||
|
||||
protected NoopGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,7 +96,7 @@ public class TestGatewayAllocator extends GatewayAllocator {
|
||||
};
|
||||
|
||||
public TestGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user