mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 01:19:02 +00:00
Remove VersionModule and use Version#current consistently.
We pretended to be able to ackt like a different version node for so long it's time to be honest and remove this ability. It's just confusing and where needed and tested we should build dedicated extension points.
This commit is contained in:
parent
98951b1203
commit
260f38fd76
@ -859,7 +859,6 @@
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ZenUnicastDiscoveryIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]NodeJoinControllerTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ping[/\\]unicast[/\\]UnicastZenPingIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PublishClusterStateActionTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
|
||||
|
@ -22,7 +22,6 @@ package org.elasticsearch;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -329,18 +328,4 @@ public class Version {
|
||||
public boolean isRC() {
|
||||
return build > 50 && build < 99;
|
||||
}
|
||||
|
||||
public static class Module extends AbstractModule {
|
||||
|
||||
private final Version version;
|
||||
|
||||
public Module(Version version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(Version.class).toInstance(version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -37,15 +37,13 @@ import org.elasticsearch.transport.TransportService;
|
||||
public class TransportMainAction extends HandledTransportAction<MainRequest, MainResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final Version version;
|
||||
|
||||
@Inject
|
||||
public TransportMainAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService, Version version) {
|
||||
ClusterService clusterService) {
|
||||
super(settings, MainAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MainRequest::new);
|
||||
this.clusterService = clusterService;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -54,6 +52,7 @@ public class TransportMainAction extends HandledTransportAction<MainRequest, Mai
|
||||
assert Node.NODE_NAME_SETTING.exists(settings);
|
||||
final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
|
||||
listener.onResponse(
|
||||
new MainResponse(Node.NODE_NAME_SETTING.get(settings), version, clusterState.getClusterName(), Build.CURRENT, available));
|
||||
new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(), Build.CURRENT,
|
||||
available));
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +129,6 @@ public class TransportClient extends AbstractClient {
|
||||
boolean success = false;
|
||||
try {
|
||||
ModulesBuilder modules = new ModulesBuilder();
|
||||
modules.add(new Version.Module(version));
|
||||
// plugin modules must be added here, before others or we can get crazy injection errors...
|
||||
for (Module pluginModule : pluginsService.nodeModules()) {
|
||||
modules.add(pluginModule);
|
||||
|
@ -112,12 +112,12 @@ public class TransportClientNodesService extends AbstractComponent {
|
||||
|
||||
@Inject
|
||||
public TransportClientNodesService(Settings settings,TransportService transportService,
|
||||
ThreadPool threadPool, Version version) {
|
||||
ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
|
||||
this.transportService = transportService;
|
||||
this.threadPool = threadPool;
|
||||
this.minCompatibilityVersion = version.minimumCompatibilityVersion();
|
||||
this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
|
||||
|
||||
this.nodesSamplerInterval = CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
|
||||
this.pingTimeout = CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
|
||||
|
@ -103,7 +103,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
private final ClusterService clusterService;
|
||||
private final IndicesService indicesService;
|
||||
private final AllocationService allocationService;
|
||||
private final Version version;
|
||||
private final AliasValidator aliasValidator;
|
||||
private final IndexTemplateFilter indexTemplateFilter;
|
||||
private final Environment env;
|
||||
@ -114,13 +113,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
@Inject
|
||||
public MetaDataCreateIndexService(Settings settings, ClusterService clusterService,
|
||||
IndicesService indicesService, AllocationService allocationService,
|
||||
Version version, AliasValidator aliasValidator,
|
||||
AliasValidator aliasValidator,
|
||||
Set<IndexTemplateFilter> indexTemplateFilters, Environment env, NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.allocationService = allocationService;
|
||||
this.version = version;
|
||||
this.aliasValidator = aliasValidator;
|
||||
this.env = env;
|
||||
this.nodeServicesProvider = nodeServicesProvider;
|
||||
@ -287,7 +285,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
|
||||
if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
|
||||
DiscoveryNodes nodes = currentState.nodes();
|
||||
final Version createdVersion = Version.smallest(version, nodes.getSmallestNonClientNodeVersion());
|
||||
final Version createdVersion = Version.smallest(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
|
||||
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
|
||||
}
|
||||
|
||||
|
@ -46,12 +46,10 @@ public class DiscoveryNodeService extends AbstractComponent {
|
||||
// don't use node.id.seed so it won't be seen as an attribute
|
||||
Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, Property.NodeScope);
|
||||
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<>();
|
||||
private final Version version;
|
||||
|
||||
@Inject
|
||||
public DiscoveryNodeService(Settings settings, Version version) {
|
||||
public DiscoveryNodeService(Settings settings) {
|
||||
super(settings);
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public static String generateNodeId(Settings settings) {
|
||||
@ -93,7 +91,7 @@ public class DiscoveryNodeService extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes,
|
||||
roles, version);
|
||||
roles, Version.CURRENT);
|
||||
}
|
||||
|
||||
public interface CustomAttributesProvider {
|
||||
|
@ -53,9 +53,9 @@ public class ElectMasterService extends AbstractComponent {
|
||||
private volatile int minimumMasterNodes;
|
||||
|
||||
@Inject
|
||||
public ElectMasterService(Settings settings, Version version) {
|
||||
public ElectMasterService(Settings settings) {
|
||||
super(settings);
|
||||
this.minMasterVersion = version.minimumCompatibilityVersion();
|
||||
this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion();
|
||||
this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
|
||||
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
|
||||
}
|
||||
|
@ -132,8 +132,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
||||
|
||||
@Inject
|
||||
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
Version version, ElectMasterService electMasterService,
|
||||
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
|
||||
ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
@ -166,7 +165,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
||||
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
|
||||
for (TransportAddress address : addresses) {
|
||||
configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
|
||||
address, emptyMap(), emptySet(), version.minimumCompatibilityVersion()));
|
||||
address, emptyMap(), emptySet(), getVersion().minimumCompatibilityVersion()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
|
||||
@ -586,4 +585,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Version getVersion() {
|
||||
return Version.CURRENT; // for tests
|
||||
}
|
||||
}
|
||||
|
@ -105,8 +105,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
||||
|
||||
private RateLimiter restoreRateLimiter;
|
||||
|
||||
private RateLimiterListener rateLimiterListener;
|
||||
|
||||
private RateLimitingInputStream.Listener snapshotThrottleListener;
|
||||
|
||||
private RateLimitingInputStream.Listener restoreThrottleListener;
|
||||
@ -163,7 +161,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
||||
this.chunkSize = chunkSize;
|
||||
this.snapshotRateLimiter = snapshotRateLimiter;
|
||||
this.restoreRateLimiter = restoreRateLimiter;
|
||||
this.rateLimiterListener = rateLimiterListener;
|
||||
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
|
||||
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
|
||||
this.compress = compress;
|
||||
|
@ -174,17 +174,17 @@ public class Node implements Closeable {
|
||||
* @param preparedSettings Base settings to configure the node with
|
||||
*/
|
||||
public Node(Settings preparedSettings) {
|
||||
this(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), Version.CURRENT, Collections.<Class<? extends Plugin>>emptyList());
|
||||
this(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), Collections.<Class<? extends Plugin>>emptyList());
|
||||
}
|
||||
|
||||
protected Node(Environment tmpEnv, Version version, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
protected Node(Environment tmpEnv, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
Settings tmpSettings = Settings.builder().put(tmpEnv.settings())
|
||||
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
|
||||
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
|
||||
|
||||
tmpSettings = TribeService.processSettings(tmpSettings);
|
||||
ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(tmpSettings));
|
||||
final String displayVersion = version + (Build.CURRENT.isSnapshot() ? "-SNAPSHOT" : "");
|
||||
final String displayVersion = Version.CURRENT + (Build.CURRENT.isSnapshot() ? "-SNAPSHOT" : "");
|
||||
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
|
||||
logger.info(
|
||||
"version[{}], pid[{}], build[{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
|
||||
@ -247,7 +247,6 @@ public class Node implements Closeable {
|
||||
resourcesToClose.add(clusterService);
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
ModulesBuilder modules = new ModulesBuilder();
|
||||
modules.add(new Version.Module(version));
|
||||
// plugin modules must be added here, before others or we can get crazy injection errors...
|
||||
for (Module pluginModule : pluginsService.nodeModules()) {
|
||||
modules.add(pluginModule);
|
||||
|
@ -70,14 +70,12 @@ public class NodeService extends AbstractComponent implements Closeable {
|
||||
|
||||
private volatile Map<String, String> serviceAttributes = emptyMap();
|
||||
|
||||
private final Version version;
|
||||
|
||||
private final Discovery discovery;
|
||||
|
||||
@Inject
|
||||
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService,
|
||||
Discovery discovery, TransportService transportService, IndicesService indicesService,
|
||||
PluginsService pluginService, CircuitBreakerService circuitBreakerService, Version version,
|
||||
PluginsService pluginService, CircuitBreakerService circuitBreakerService,
|
||||
ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService, SettingsFilter settingsFilter) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
@ -85,7 +83,6 @@ public class NodeService extends AbstractComponent implements Closeable {
|
||||
this.transportService = transportService;
|
||||
this.indicesService = indicesService;
|
||||
this.discovery = discovery;
|
||||
this.version = version;
|
||||
this.pluginService = pluginService;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.clusterService = clusterService;
|
||||
@ -126,7 +123,7 @@ public class NodeService extends AbstractComponent implements Closeable {
|
||||
}
|
||||
|
||||
public NodeInfo info() {
|
||||
return new NodeInfo(version, Build.CURRENT, discovery.localNode(), serviceAttributes,
|
||||
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(), serviceAttributes,
|
||||
settings,
|
||||
monitorService.osService().info(),
|
||||
monitorService.processService().info(),
|
||||
@ -141,7 +138,7 @@ public class NodeService extends AbstractComponent implements Closeable {
|
||||
|
||||
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
|
||||
boolean transport, boolean http, boolean plugin, boolean ingest) {
|
||||
return new NodeInfo(version, Build.CURRENT, discovery.localNode(), serviceAttributes,
|
||||
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(), serviceAttributes,
|
||||
settings ? settingsFilter.filter(this.settings) : null,
|
||||
os ? monitorService.osService().info() : null,
|
||||
process ? monitorService.processService().info() : null,
|
||||
|
@ -77,7 +77,6 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
public static final String LOCAL_TRANSPORT_THREAD_NAME_PREFIX = "local_transport";
|
||||
final ThreadPool threadPool;
|
||||
private final ThreadPoolExecutor workers;
|
||||
private final Version version;
|
||||
private volatile TransportServiceAdapter transportServiceAdapter;
|
||||
private volatile BoundTransportAddress boundAddress;
|
||||
private volatile LocalTransportAddress localAddress;
|
||||
@ -92,11 +91,10 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue";
|
||||
|
||||
@Inject
|
||||
public LocalTransport(Settings settings, ThreadPool threadPool, Version version,
|
||||
public LocalTransport(Settings settings, ThreadPool threadPool,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.version = version;
|
||||
int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings));
|
||||
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
|
||||
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
|
||||
@ -207,7 +205,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
@Override
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
final Version version = Version.smallest(node.getVersion(), this.version);
|
||||
final Version version = Version.smallest(node.getVersion(), getVersion());
|
||||
|
||||
try (BytesStreamOutput stream = new BytesStreamOutput()) {
|
||||
stream.setVersion(version);
|
||||
@ -404,4 +402,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
public List<String> getLocalAddresses() {
|
||||
return Collections.singletonList("0.0.0.0");
|
||||
}
|
||||
|
||||
protected Version getVersion() { // for tests
|
||||
return Version.CURRENT;
|
||||
}
|
||||
}
|
||||
|
@ -210,7 +210,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope);
|
||||
|
||||
protected final NetworkService networkService;
|
||||
protected final Version version;
|
||||
|
||||
protected final boolean blockingClient;
|
||||
protected final TimeValue connectTimeout;
|
||||
@ -254,13 +253,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
final ScheduledPing scheduledPing;
|
||||
|
||||
@Inject
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version,
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.networkService = networkService;
|
||||
this.bigArrays = bigArrays;
|
||||
this.version = version;
|
||||
|
||||
this.workerCount = WORKER_COUNT.get(settings);
|
||||
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
|
||||
@ -894,7 +892,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
// we pick the smallest of the 2, to support both backward and forward compatibility
|
||||
// note, this is the only place we need to do this, since from here on, we use the serialized version
|
||||
// as the version to use also when the node receiving this request will send the response with
|
||||
Version version = Version.smallest(this.version, node.getVersion());
|
||||
Version version = Version.smallest(getCurrentVersion(), node.getVersion());
|
||||
|
||||
stream.setVersion(version);
|
||||
threadPool.getThreadContext().writeTo(stream);
|
||||
@ -1401,4 +1399,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Version getCurrentVersion() {
|
||||
// this is just for tests to mock stuff like the nodes version - tests can override this internally
|
||||
return Version.CURRENT;
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
package org.elasticsearch.tribe;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
@ -32,6 +31,6 @@ import java.util.Collections;
|
||||
*/
|
||||
class TribeClientNode extends Node {
|
||||
TribeClientNode(Settings settings) {
|
||||
super(new Environment(settings), Version.CURRENT, Collections.<Class<? extends Plugin>>emptyList());
|
||||
super(new Environment(settings), Collections.<Class<? extends Plugin>>emptyList());
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||
public TestNode(String name, ThreadPool threadPool, Settings settings) {
|
||||
clusterService = createClusterService(threadPool);
|
||||
transportService = new TransportService(settings,
|
||||
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry(),
|
||||
new LocalTransport(settings, threadPool, new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService()), threadPool) {
|
||||
@Override
|
||||
protected TaskManager createTaskManager() {
|
||||
|
@ -158,7 +158,6 @@ public class MetaDataIndexTemplateServiceTests extends ESSingleNodeTestCase {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
Version.CURRENT,
|
||||
null,
|
||||
new HashSet<>(),
|
||||
null,
|
||||
@ -189,7 +188,6 @@ public class MetaDataIndexTemplateServiceTests extends ESSingleNodeTestCase {
|
||||
clusterService,
|
||||
indicesService,
|
||||
null,
|
||||
Version.CURRENT,
|
||||
null,
|
||||
new HashSet<>(),
|
||||
null,
|
||||
|
@ -112,7 +112,7 @@ public class MainActionTests extends ESTestCase {
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), mock(TransportService.class),
|
||||
mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), clusterService, Version.CURRENT);
|
||||
mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), clusterService);
|
||||
AtomicReference<MainResponse> responseRef = new AtomicReference<>();
|
||||
action.doExecute(new MainRequest(), new ActionListener<MainResponse>() {
|
||||
@Override
|
||||
|
@ -89,7 +89,7 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry(), circuitBreakerService);
|
||||
LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, new NamedWriteableRegistry(), circuitBreakerService);
|
||||
clusterService = createClusterService(threadPool);
|
||||
transportService = new TransportService(clusterService.getSettings(), transport, threadPool);
|
||||
transportService.start();
|
||||
|
@ -102,7 +102,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
transportClientNodesService =
|
||||
new TransportClientNodesService(settings, transportService, threadPool, Version.CURRENT);
|
||||
new TransportClientNodesService(settings, transportService, threadPool);
|
||||
this.nodesCount = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < nodesCount; i++) {
|
||||
transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i));
|
||||
|
@ -212,7 +212,6 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
Version.CURRENT,
|
||||
null,
|
||||
new HashSet<>(),
|
||||
null,
|
||||
|
@ -54,7 +54,7 @@ public class DiscoveryNodeServiceTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
}
|
||||
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build(), Version.CURRENT);
|
||||
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build());
|
||||
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(DummyTransportAddress.INSTANCE);
|
||||
assertThat(discoveryNode.getRoles(), equalTo(selectedRoles));
|
||||
assertThat(discoveryNode.getAttributes(), equalTo(expectedAttributes));
|
||||
@ -68,7 +68,7 @@ public class DiscoveryNodeServiceTests extends ESTestCase {
|
||||
builder.put("node.attr.attr" + i, "value" + i);
|
||||
expectedAttributes.put("attr" + i, "value" + i);
|
||||
}
|
||||
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build(), Version.CURRENT);
|
||||
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build());
|
||||
int numCustomAttributes = randomIntBetween(0, 5);
|
||||
Map<String, String> customAttributes = new HashMap<>();
|
||||
for (int i = 0; i < numCustomAttributes; i++) {
|
||||
|
@ -58,7 +58,7 @@ public class NetworkModuleTests extends ModuleTestCase {
|
||||
|
||||
static class FakeTransport extends AssertingLocalTransport {
|
||||
public FakeTransport() {
|
||||
super(null, null, null, null, null);
|
||||
super(null, null, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,8 +33,8 @@ public class DiscoveryModuleTests extends ModuleTestCase {
|
||||
|
||||
public static class DummyMasterElectionService extends ElectMasterService {
|
||||
|
||||
public DummyMasterElectionService(Settings settings, Version version) {
|
||||
super(settings, version);
|
||||
public DummyMasterElectionService(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,12 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||
// trace zenfd actions but keep the default otherwise
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), singleton(TransportLivenessAction.NAME))
|
||||
.build(),
|
||||
new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService),
|
||||
new LocalTransport(settings, threadPool, namedWriteableRegistry, circuitBreakerService) {
|
||||
@Override
|
||||
protected Version getVersion() {
|
||||
return version;
|
||||
}
|
||||
},
|
||||
threadPool);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
@ -35,15 +35,12 @@ import java.util.Set;
|
||||
public class ElectMasterServiceTests extends ESTestCase {
|
||||
|
||||
ElectMasterService electMasterService() {
|
||||
return new ElectMasterService(Settings.EMPTY, Version.CURRENT);
|
||||
return new ElectMasterService(Settings.EMPTY);
|
||||
}
|
||||
|
||||
List<DiscoveryNode> generateRandomNodes() {
|
||||
int count = scaledRandomIntBetween(1, 100);
|
||||
ArrayList<DiscoveryNode> nodes = new ArrayList<>(count);
|
||||
|
||||
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
Set<DiscoveryNode.Role> roles = new HashSet<>();
|
||||
if (randomBoolean()) {
|
||||
|
@ -99,7 +99,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
||||
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
|
||||
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));
|
||||
nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY),
|
||||
new ElectMasterService(Settings.EMPTY, Version.CURRENT),
|
||||
new ElectMasterService(Settings.EMPTY),
|
||||
new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||
Settings.EMPTY);
|
||||
}
|
||||
|
@ -320,7 +320,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
||||
}
|
||||
|
||||
public void testJoinElectedMaster_incompatibleMinVersion() {
|
||||
ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY, Version.CURRENT);
|
||||
ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY);
|
||||
|
||||
DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(),
|
||||
Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT);
|
||||
|
@ -68,7 +68,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
|
||||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);
|
||||
ElectMasterService electMasterService = new ElectMasterService(settings);
|
||||
|
||||
NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", Version.CURRENT);
|
||||
NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", Version.CURRENT);
|
||||
@ -88,7 +88,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
.build();
|
||||
|
||||
Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build();
|
||||
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, Version.CURRENT, electMasterService, null);
|
||||
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, electMasterService, null);
|
||||
zenPingA.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
@ -102,7 +102,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
});
|
||||
zenPingA.start();
|
||||
|
||||
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, Version.CURRENT, electMasterService, null);
|
||||
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, electMasterService, null);
|
||||
zenPingB.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
@ -116,7 +116,13 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
});
|
||||
zenPingB.start();
|
||||
|
||||
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, versionD, electMasterService, null);
|
||||
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, electMasterService,
|
||||
null) {
|
||||
@Override
|
||||
protected Version getVersion() {
|
||||
return versionD;
|
||||
}
|
||||
};
|
||||
zenPingC.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
@ -130,7 +136,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
});
|
||||
zenPingC.start();
|
||||
|
||||
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, Version.CURRENT, electMasterService, null);
|
||||
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, electMasterService, null);
|
||||
zenPingD.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
@ -191,8 +197,15 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId, Version version) {
|
||||
NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId,
|
||||
Version version) {
|
||||
NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getCurrentVersion() {
|
||||
return version;
|
||||
}
|
||||
};
|
||||
final TransportService transportService = new TransportService(settings, transport, threadPool);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
@ -208,7 +221,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
public void onNodeDisconnected(DiscoveryNode node) {
|
||||
}
|
||||
});
|
||||
final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version);
|
||||
final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(),
|
||||
version);
|
||||
transportService.setLocalNode(node);
|
||||
return new NetworkHandle((InetSocketTransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters);
|
||||
}
|
||||
@ -219,7 +233,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
public final DiscoveryNode node;
|
||||
public final ConcurrentMap<TransportAddress, AtomicInteger> counters;
|
||||
|
||||
public NetworkHandle(InetSocketTransportAddress address, TransportService transportService, DiscoveryNode discoveryNode, ConcurrentMap<TransportAddress, AtomicInteger> counters) {
|
||||
public NetworkHandle(InetSocketTransportAddress address, TransportService transportService, DiscoveryNode discoveryNode,
|
||||
ConcurrentMap<TransportAddress, AtomicInteger> counters) {
|
||||
this.address = address;
|
||||
this.transportService = transportService;
|
||||
this.node = discoveryNode;
|
||||
|
@ -145,26 +145,22 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public MockNode createMockNode(final String name) throws Exception {
|
||||
return createMockNode(name, Settings.EMPTY, Version.CURRENT);
|
||||
return createMockNode(name, Settings.EMPTY);
|
||||
}
|
||||
|
||||
public MockNode createMockNode(String name, Settings settings) throws Exception {
|
||||
return createMockNode(name, settings, Version.CURRENT);
|
||||
return createMockNode(name, settings, null);
|
||||
}
|
||||
|
||||
public MockNode createMockNode(final String name, Settings settings, Version version) throws Exception {
|
||||
return createMockNode(name, settings, version, null);
|
||||
}
|
||||
|
||||
public MockNode createMockNode(String name, Settings settings, Version version, @Nullable ClusterStateListener listener) throws Exception {
|
||||
public MockNode createMockNode(String name, Settings settings, @Nullable ClusterStateListener listener) throws Exception {
|
||||
settings = Settings.builder()
|
||||
.put("name", name)
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.put(settings)
|
||||
.build();
|
||||
|
||||
MockTransportService service = buildTransportService(settings, version);
|
||||
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version);
|
||||
MockTransportService service = buildTransportService(settings);
|
||||
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings);
|
||||
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress());
|
||||
MockNode node = new MockNode(discoveryNode, service, listener, logger);
|
||||
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
|
||||
@ -232,8 +228,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
protected MockTransportService buildTransportService(Settings settings, Version version) {
|
||||
MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool);
|
||||
protected MockTransportService buildTransportService(Settings settings) {
|
||||
MockTransportService transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
return transportService;
|
||||
@ -257,8 +253,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testSimpleClusterStatePublishing() throws Exception {
|
||||
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT).setAsMaster();
|
||||
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
|
||||
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY).setAsMaster();
|
||||
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY);
|
||||
|
||||
// Initial cluster state
|
||||
ClusterState clusterState = nodeA.clusterState;
|
||||
@ -286,7 +282,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
|
||||
// Adding new node - this node should get full cluster state while nodeB should still be getting diffs
|
||||
|
||||
MockNode nodeC = createMockNode("nodeC", Settings.EMPTY, Version.CURRENT);
|
||||
MockNode nodeC = createMockNode("nodeC", Settings.EMPTY);
|
||||
|
||||
// cluster state update 3 - register node C
|
||||
previousClusterState = clusterState;
|
||||
@ -336,14 +332,11 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testUnexpectedDiffPublishing() throws Exception {
|
||||
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
fail("Shouldn't send cluster state to myself");
|
||||
}
|
||||
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, event -> {
|
||||
fail("Shouldn't send cluster state to myself");
|
||||
}).setAsMaster();
|
||||
|
||||
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
|
||||
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY);
|
||||
|
||||
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
|
||||
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build();
|
||||
@ -362,14 +355,14 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
public void testDisablingDiffPublishing() throws Exception {
|
||||
Settings noDiffPublishingSettings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE_SETTING.getKey(), false).build();
|
||||
|
||||
MockNode nodeA = createMockNode("nodeA", noDiffPublishingSettings, Version.CURRENT, new ClusterStateListener() {
|
||||
MockNode nodeA = createMockNode("nodeA", noDiffPublishingSettings, new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
fail("Shouldn't send cluster state to myself");
|
||||
}
|
||||
});
|
||||
|
||||
MockNode nodeB = createMockNode("nodeB", noDiffPublishingSettings, Version.CURRENT, new ClusterStateListener() {
|
||||
MockNode nodeB = createMockNode("nodeB", noDiffPublishingSettings, new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
assertFalse(event.state().wasReadFromDiff());
|
||||
@ -400,7 +393,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
int numberOfNodes = randomIntBetween(2, 10);
|
||||
int numberOfIterations = scaledRandomIntBetween(5, 50);
|
||||
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE_SETTING.getKey(), randomBoolean()).build();
|
||||
MockNode master = createMockNode("node0", settings, Version.CURRENT, new ClusterStateListener() {
|
||||
MockNode master = createMockNode("node0", settings, new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
assertProperMetaDataForVersion(event.state().metaData(), event.state().version());
|
||||
@ -409,7 +402,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(master.nodes());
|
||||
for (int i = 1; i < numberOfNodes; i++) {
|
||||
final String name = "node" + i;
|
||||
final MockNode node = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() {
|
||||
final MockNode node = createMockNode(name, settings, new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
assertProperMetaDataForVersion(event.state().metaData(), event.state().version());
|
||||
@ -444,14 +437,14 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testSerializationFailureDuringDiffPublishing() throws Exception {
|
||||
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() {
|
||||
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
fail("Shouldn't send cluster state to myself");
|
||||
}
|
||||
}).setAsMaster();
|
||||
|
||||
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
|
||||
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY);
|
||||
|
||||
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
|
||||
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build();
|
||||
|
@ -74,7 +74,7 @@ public class HttpServerTests extends ESTestCase {
|
||||
|
||||
ClusterService clusterService = new ClusterService(Settings.EMPTY,
|
||||
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
|
||||
NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null, null,
|
||||
NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null,
|
||||
clusterService, null);
|
||||
httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService);
|
||||
httpServer.start();
|
||||
|
@ -76,7 +76,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
|
||||
.build();
|
||||
clusterService = createClusterService(THREAD_POOL);
|
||||
transport =
|
||||
new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
new LocalTransport(settings, THREAD_POOL, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL);
|
||||
indicesService = getInstanceFromNode(IndicesService.class);
|
||||
shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL);
|
||||
|
@ -158,7 +158,7 @@ public class ClusterStateChanges {
|
||||
MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(settings, clusterService,
|
||||
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, new IndexNameExpressionResolver(settings));
|
||||
MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService,
|
||||
allocationService, Version.CURRENT, new AliasValidator(settings), Collections.emptySet(), environment,
|
||||
allocationService, new AliasValidator(settings), Collections.emptySet(), environment,
|
||||
nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
|
||||
transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool,
|
||||
|
@ -66,7 +66,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
|
||||
threadPool = new ThreadPool(settings);
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(),
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
TransportService transportService = new TransportService(settings, nettyTransport, threadPool);
|
||||
|
@ -64,7 +64,6 @@ public class NettyTransportServiceHandshakeTests extends ESTestCase {
|
||||
threadPool,
|
||||
new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE,
|
||||
Version.CURRENT,
|
||||
new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService());
|
||||
TransportService transportService = new MockTransportService(settings, transport, threadPool);
|
||||
|
@ -36,9 +36,9 @@ public class TransportModuleTests extends ModuleTestCase {
|
||||
|
||||
static class FakeTransport extends AssertingLocalTransport {
|
||||
@Inject
|
||||
public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version,
|
||||
public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool,
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, circuitBreakerService, threadPool, version, namedWriteableRegistry);
|
||||
super(settings, circuitBreakerService, threadPool, namedWriteableRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,14 +65,14 @@ public class NettyScheduledPingTests extends ESTestCase {
|
||||
|
||||
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService);
|
||||
BigArrays.NON_RECYCLING_INSTANCE, registryA, circuitBreakerService);
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
|
||||
serviceA.start();
|
||||
serviceA.acceptIncomingRequests();
|
||||
|
||||
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService);
|
||||
BigArrays.NON_RECYCLING_INSTANCE, registryB, circuitBreakerService);
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
|
||||
|
||||
serviceB.start();
|
||||
|
@ -93,9 +93,9 @@ public class NettyTransportIT extends ESIntegTestCase {
|
||||
|
||||
@Inject
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
Version version, NamedWriteableRegistry namedWriteableRegistry,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry, circuitBreakerService);
|
||||
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -136,7 +136,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT,
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
|
||||
|
@ -42,7 +42,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
|
||||
@Override
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
|
||||
MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, version, threadPool);
|
||||
MockTransportService transportService = MockTransportService.nettyFromThreadPool(settings, threadPool, version);
|
||||
transportService.start();
|
||||
return transportService;
|
||||
}
|
||||
|
@ -101,7 +101,6 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
||||
private final AzureComputeService azureComputeService;
|
||||
private TransportService transportService;
|
||||
private NetworkService networkService;
|
||||
private final Version version;
|
||||
|
||||
private final TimeValue refreshInterval;
|
||||
private long lastRefresh;
|
||||
@ -114,13 +113,11 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
||||
@Inject
|
||||
public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureComputeService,
|
||||
TransportService transportService,
|
||||
NetworkService networkService,
|
||||
Version version) {
|
||||
NetworkService networkService) {
|
||||
super(settings);
|
||||
this.azureComputeService = azureComputeService;
|
||||
this.transportService = transportService;
|
||||
this.networkService = networkService;
|
||||
this.version = version;
|
||||
|
||||
this.refreshInterval = Discovery.REFRESH_SETTING.get(settings);
|
||||
|
||||
@ -253,7 +250,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
||||
for (TransportAddress address : addresses) {
|
||||
logger.trace("adding {}, transport_address {}", networkAddress, address);
|
||||
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(),
|
||||
emptySet(), version.minimumCompatibilityVersion()));
|
||||
emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
|
||||
|
@ -59,8 +59,6 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||
|
||||
private final AmazonEC2 client;
|
||||
|
||||
private final Version version;
|
||||
|
||||
private final boolean bindAnyGroup;
|
||||
|
||||
private final Set<String> groups;
|
||||
@ -74,11 +72,10 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||
private final DiscoNodesCache discoNodes;
|
||||
|
||||
@Inject
|
||||
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service, Version version) {
|
||||
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.client = awsEc2Service.client();
|
||||
this.version = version;
|
||||
|
||||
this.hostType = DISCOVERY_EC2.HOST_TYPE_SETTING.get(settings);
|
||||
this.discoNodes = new DiscoNodesCache(DISCOVERY_EC2.NODE_CACHE_TIME_SETTING.get(settings));
|
||||
@ -175,7 +172,7 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
|
||||
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i],
|
||||
emptyMap(), emptySet(), version.minimumCompatibilityVersion()));
|
||||
emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address);
|
||||
|
@ -68,7 +68,6 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
||||
private TransportService transportService;
|
||||
private NetworkService networkService;
|
||||
|
||||
private final Version version;
|
||||
private final String project;
|
||||
private final List<String> zones;
|
||||
private final List<String> tags;
|
||||
@ -80,13 +79,11 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
||||
@Inject
|
||||
public GceUnicastHostsProvider(Settings settings, GceComputeService gceComputeService,
|
||||
TransportService transportService,
|
||||
NetworkService networkService,
|
||||
Version version) {
|
||||
NetworkService networkService) {
|
||||
super(settings);
|
||||
this.gceComputeService = gceComputeService;
|
||||
this.transportService = transportService;
|
||||
this.networkService = networkService;
|
||||
this.version = version;
|
||||
|
||||
this.refreshInterval = GceComputeService.REFRESH_SETTING.get(settings);
|
||||
this.project = GceComputeService.PROJECT_SETTING.get(settings);
|
||||
@ -244,7 +241,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
||||
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
|
||||
ip_private, transportAddress, status);
|
||||
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress,
|
||||
emptyMap(), emptySet(), version.minimumCompatibilityVersion()));
|
||||
emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -21,7 +21,6 @@ package org.elasticsearch.repositories.azure;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.MockNode;
|
||||
import org.elasticsearch.node.Node;
|
||||
@ -111,7 +110,7 @@ public class AzureRepositoryF {
|
||||
// settings.put("cloud.azure.storage.my_account2.key", "account_key_secondary");
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Node node = new MockNode(settings.build(), Version.CURRENT, Collections.singletonList(AzureRepositoryPlugin.class));
|
||||
final Node node = new MockNode(settings.build(), Collections.singletonList(AzureRepositoryPlugin.class));
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -35,13 +35,10 @@ import java.util.Collection;
|
||||
*/
|
||||
public class MockNode extends Node {
|
||||
|
||||
// these are kept here so a copy of this MockNode can be created, since Node does not store them
|
||||
private Version version;
|
||||
private Collection<Class<? extends Plugin>> plugins;
|
||||
|
||||
public MockNode(Settings settings, Version version, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
super(InternalSettingsPreparer.prepareEnvironment(settings, null), version, classpathPlugins);
|
||||
this.version = version;
|
||||
public MockNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
super(InternalSettingsPreparer.prepareEnvironment(settings, null), classpathPlugins);
|
||||
this.plugins = classpathPlugins;
|
||||
}
|
||||
|
||||
@ -49,7 +46,4 @@ public class MockNode extends Node {
|
||||
return plugins;
|
||||
}
|
||||
|
||||
public Version getVersion() {
|
||||
return version;
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
@ -147,10 +146,6 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
||||
return false;
|
||||
}
|
||||
|
||||
/** The version of elasticsearch the node should act like. */
|
||||
protected Version getVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
/** The plugin classes that should be added to the node. */
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
@ -187,7 +182,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), true)
|
||||
.put(nodeSettings()) // allow test cases to provide their own settings or override these
|
||||
.build();
|
||||
Node build = new MockNode(settings, getVersion(), getPlugins());
|
||||
Node build = new MockNode(settings, getPlugins());
|
||||
build.start();
|
||||
assertThat(DiscoveryNode.isLocalNode(build.settings()), is(true));
|
||||
return build;
|
||||
|
@ -614,7 +614,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
.put("node.name", name)
|
||||
.put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed)
|
||||
.build();
|
||||
MockNode node = new MockNode(finalSettings, version, plugins);
|
||||
MockNode node = new MockNode(finalSettings, plugins);
|
||||
return new NodeAndClient(name, node);
|
||||
}
|
||||
|
||||
@ -888,8 +888,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
final long newIdSeed = DiscoveryNodeService.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id
|
||||
Settings finalSettings = Settings.builder().put(node.settings()).put(newSettings).put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build();
|
||||
Collection<Class<? extends Plugin>> plugins = node.getPlugins();
|
||||
Version version = node.getVersion();
|
||||
node = new MockNode(finalSettings, version, plugins);
|
||||
node = new MockNode(finalSettings, plugins);
|
||||
node.start();
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
@ -75,8 +74,8 @@ public class AssertingLocalTransport extends LocalTransport {
|
||||
|
||||
@Inject
|
||||
public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool,
|
||||
Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService);
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, namedWriteableRegistry, circuitBreakerService);
|
||||
final long seed = ESIntegTestCase.INDEX_TEST_SEED_SETTING.get(settings);
|
||||
random = new Random(seed);
|
||||
minVersion = ASSERTING_TRANSPORT_MIN_VERSION_KEY.get(settings);
|
||||
|
@ -96,17 +96,26 @@ public class MockTransportService extends TransportService {
|
||||
|
||||
public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry, new NoneCircuitBreakerService());
|
||||
Transport transport = new LocalTransport(settings, threadPool, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getVersion() {
|
||||
return version;
|
||||
}
|
||||
};
|
||||
return new MockTransportService(settings, transport, threadPool);
|
||||
}
|
||||
|
||||
public static MockTransportService nettyFromThreadPool(
|
||||
Settings settings,
|
||||
Version version,
|
||||
ThreadPool threadPool) {
|
||||
ThreadPool threadPool, final Version version) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
version, namedWriteableRegistry, new NoneCircuitBreakerService());
|
||||
namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
protected Version getCurrentVersion() {
|
||||
return version;
|
||||
}
|
||||
};
|
||||
return new MockTransportService(Settings.EMPTY, transport, threadPool);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user