From 28a4ac01e4453c519ad6c3699bacf39503265c51 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 27 Jul 2013 22:13:48 +0200 Subject: [PATCH] fix transport support for versioning when starting the request/response cycle, we should use the lowest version out of the current node version, and the target node version to serialize the request, and put it in the header. this will allow to support both backward and forward comp. in addition, have Version as an injected value to different services, to make different versions more easily testable, compared to using Version#CURRENT --- src/main/java/org/elasticsearch/Version.java | 22 ++ .../client/transport/TransportClient.java | 4 + .../TransportClientNodesService.java | 9 +- .../metadata/MetaDataCreateIndexService.java | 13 +- .../cluster/node/DiscoveryNode.java | 19 +- .../discovery/local/LocalDiscovery.java | 11 +- .../discovery/zen/ZenDiscovery.java | 17 +- .../discovery/zen/ping/ZenPingService.java | 14 +- .../zen/ping/multicast/MulticastZenPing.java | 12 +- .../zen/ping/unicast/UnicastZenPing.java | 14 +- .../publish/PublishClusterStateAction.java | 11 +- .../node/internal/InternalNode.java | 5 +- .../node/service/NodeService.java | 4 +- .../rest/action/main/RestMainAction.java | 12 +- .../transport/PlainTransportFuture.java | 15 +- .../transport/TransportService.java | 4 + .../transport/local/LocalTransport.java | 24 +- .../local/LocalTransportChannel.java | 13 +- .../transport/netty/NettyTransport.java | 23 +- .../netty/NettyTransportChannel.java | 4 - .../BenchmarkNettyLargeMessages.java | 26 +- .../transport/TransportBenchmark.java | 8 +- .../node/DiscoveryNodeFiltersTests.java | 25 +- .../allocation/RoutingAllocationTests.java | 7 +- .../ClusterSerializationTests.java | 3 +- .../ping/multicast/MulticastZenPingTests.java | 24 +- .../zen/ping/unicast/UnicastZenPingTests.java | 17 +- .../AbstractSimpleTransportTests.java | 332 +++++++++++++++++- .../local/SimpleLocalTransportTests.java | 12 +- .../netty/SimpleNettyTransportTests.java | 15 +- 30 files changed, 527 insertions(+), 192 deletions(-) diff --git a/src/main/java/org/elasticsearch/Version.java b/src/main/java/org/elasticsearch/Version.java index 3b273820df5..e4bd5fa97ce 100644 --- a/src/main/java/org/elasticsearch/Version.java +++ b/src/main/java/org/elasticsearch/Version.java @@ -20,6 +20,7 @@ package org.elasticsearch; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -242,6 +243,13 @@ public class Version implements Serializable { out.writeVInt(version.id); } + /** + * Returns the smallest version between the 2. + */ + public static Version smallest(Version version1, Version version2) { + return version1.id < version2.id ? version1 : version2; + } + public final int id; public final byte major; public final byte minor; @@ -324,4 +332,18 @@ public class Version implements Serializable { public int hashCode() { return id; } + + public static class Module extends AbstractModule { + + private final Version version; + + public Module(Version version) { + this.version = version; + } + + @Override + protected void configure() { + bind(Version.class).toInstance(version); + } + } } diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index a340729f44b..1a92b41e2e0 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -21,6 +21,7 @@ package org.elasticsearch.client.transport; import com.google.common.collect.ImmutableList; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.*; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -163,9 +164,12 @@ public class TransportClient extends AbstractClient { this.pluginsService = new PluginsService(settings, tuple.v2()); this.settings = pluginsService.updatedSettings(); + Version version = Version.CURRENT; + CompressorFactory.configure(this.settings); ModulesBuilder modules = new ModulesBuilder(); + modules.add(new Version.Module(version)); modules.add(new CacheRecyclerModule(settings)); modules.add(new PluginsModule(this.settings, pluginsService)); modules.add(new EnvironmentModule(environment)); diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 8d34afbe206..1af388806ed 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; @@ -63,6 +64,8 @@ public class TransportClientNodesService extends AbstractComponent { private final ThreadPool threadPool; + private final Version version; + // nodes that are added to be discovered private volatile ImmutableList listedNodes = ImmutableList.of(); @@ -83,12 +86,12 @@ public class TransportClientNodesService extends AbstractComponent { private volatile boolean closed; @Inject - public TransportClientNodesService(Settings settings, ClusterName clusterName, - TransportService transportService, ThreadPool threadPool) { + public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService, ThreadPool threadPool, Version version) { super(settings); this.clusterName = clusterName; this.transportService = transportService; this.threadPool = threadPool; + this.version = version; this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5)); this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis(); @@ -147,7 +150,7 @@ public class TransportClientNodesService extends AbstractComponent { ImmutableList.Builder builder = ImmutableList.builder(); builder.addAll(listedNodes()); for (TransportAddress transportAddress : filtered) { - DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress); + DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, version); logger.debug("adding address [{}]", node); builder.add(node); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index dc8602c80aa..85f599e2825 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -85,24 +85,18 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde public class MetaDataCreateIndexService extends AbstractComponent { private final Environment environment; - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final IndicesService indicesService; - private final AllocationService allocationService; - private final NodeIndexCreatedAction nodeIndexCreatedAction; - private final MetaDataService metaDataService; - + private final Version version; private final String riverIndexName; @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, - AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) { + AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName) { super(settings); this.environment = environment; this.threadPool = threadPool; @@ -111,6 +105,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.allocationService = allocationService; this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.metaDataService = metaDataService; + this.version = version; this.riverIndexName = riverIndexName; } @@ -257,7 +252,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS)); } - indexSettingsBuilder.put(SETTING_VERSION_CREATED, Version.CURRENT); + indexSettingsBuilder.put(SETTING_VERSION_CREATED, version); Settings actualIndexSettings = indexSettingsBuilder.build(); diff --git a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 7e68ed0d0f8..104f3a314ce 100644 --- a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -67,27 +67,21 @@ public class DiscoveryNode implements Streamable, Serializable { public static final ImmutableList EMPTY_LIST = ImmutableList.of(); - private String nodeName = "".intern(); - + private String nodeName = ""; private String nodeId; - private TransportAddress address; - private ImmutableMap attributes; - private Version version = Version.CURRENT; - private DiscoveryNode() { + DiscoveryNode() { } - public DiscoveryNode(String nodeId, TransportAddress address) { - this("", nodeId, address, ImmutableMap.of()); + public DiscoveryNode(String nodeId, TransportAddress address, Version version) { + this("", nodeId, address, ImmutableMap.of(), version); } - public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map attributes) { - if (nodeName == null) { - this.nodeName = "".intern(); - } else { + public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map attributes, Version version) { + if (nodeName != null) { this.nodeName = nodeName.intern(); } ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -97,6 +91,7 @@ public class DiscoveryNode implements Streamable, Serializable { this.attributes = builder.build(); this.nodeId = nodeId.intern(); this.address = address; + this.version = version; } /** diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 56d024066dc..95b4ac1564f 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery.local; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -55,14 +56,11 @@ import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { private final TransportService transportService; - private final ClusterService clusterService; - private final DiscoveryNodeService discoveryNodeService; - private AllocationService allocationService; - private final ClusterName clusterName; + private final Version version; private DiscoveryNode localNode; @@ -78,12 +76,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService) { + DiscoveryNodeService discoveryNodeService, Version version) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; this.transportService = transportService; this.discoveryNodeService = discoveryNodeService; + this.version = version; } @Override @@ -106,7 +105,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } logger.debug("Connected to cluster [{}]", clusterName); this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(), - discoveryNodeService.buildAttributes()); + discoveryNodeService.buildAttributes(), version); clusterGroup.members().add(this); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 810500fd6e6..3ec6d7033fa 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -76,26 +77,17 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, DiscoveryNodesProvider { private final ThreadPool threadPool; - private final TransportService transportService; - private final ClusterService clusterService; - private AllocationService allocationService; - private final ClusterName clusterName; - private final DiscoveryNodeService discoveryNodeService; - private final ZenPingService pingService; - private final MasterFaultDetection masterFD; - private final NodesFaultDetection nodesFD; - private final PublishClusterStateAction publishClusterState; - private final MembershipAction membership; + private final Version version; private final TimeValue pingTimeout; @@ -127,7 +119,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, - DiscoveryNodeService discoveryNodeService, ZenPingService pingService) { + DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version) { super(settings); this.clusterName = clusterName; this.threadPool = threadPool; @@ -135,6 +127,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.transportService = transportService; this.discoveryNodeService = discoveryNodeService; this.pingService = pingService; + this.version = version; // also support direct discovery.zen settings, for cases when it gets extended this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3))))); @@ -176,7 +169,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen Map nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling String nodeId = UUID.randomBase64UUID(); - localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes); + localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version); latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build(); nodesFD.updateNodes(latestDiscoNodes); pingService.start(); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index 7150c258cfa..6a0343188d0 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen.ping; import com.google.common.collect.ImmutableList; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; @@ -51,17 +52,22 @@ public class ZenPingService extends AbstractLifecycleComponent implemen private volatile ImmutableList zenPings = ImmutableList.of(); - @Inject + // here for backward comp. with discovery plugins public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, @Nullable Set unicastHostsProviders) { - super(settings); + this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, unicastHostsProviders); + } + @Inject + public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, + Version version, @Nullable Set unicastHostsProviders) { + super(settings); ImmutableList.Builder zenPingsBuilder = ImmutableList.builder(); if (componentSettings.getAsBoolean("multicast.enabled", true)) { - zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService)); + zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version)); } // always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast - zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, unicastHostsProviders)); + zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, unicastHostsProviders)); this.zenPings = zenPingsBuilder.build(); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 006572f8824..8f9acb18ed5 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -71,6 +71,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final TransportService transportService; private final ClusterName clusterName; private final NetworkService networkService; + private final Version version; private volatile DiscoveryNodesProvider nodesProvider; private final boolean pingEnabled; @@ -87,16 +88,17 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final Object sendMutex = new Object(); private final Object receiveMutex = new Object(); - public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { - this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS)); + public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) { + this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version); } - public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService) { + public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, Version version) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.clusterName = clusterName; this.networkService = networkService; + this.version = version; this.address = componentSettings.get("address"); this.port = componentSettings.getAsInt("port", 54328); @@ -256,7 +258,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem BytesStreamOutput bStream = new BytesStreamOutput(); StreamOutput out = new HandlesStreamOutput(bStream); out.writeBytes(INTERNAL_HEADER); - Version.writeVersion(Version.CURRENT, out); + Version.writeVersion(version, out); out.writeInt(id); clusterName.writeTo(out); nodesProvider.nodes().localNode().writeTo(out); @@ -467,7 +469,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem XContentBuilder builder = XContentFactory.contentBuilder(contentType); builder.startObject().startObject("response"); builder.field("cluster_name", MulticastZenPing.this.clusterName.value()); - builder.startObject("version").field("number", Version.CURRENT.number()).field("snapshot_build", Version.CURRENT.snapshot).endObject(); + builder.startObject("version").field("number", version.number()).field("snapshot_build", version.snapshot).endObject(); builder.field("transport_address", localNode.address().toString()); if (nodesProvider.nodeService() != null) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 4e063a3c88e..b261d8496db 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -47,7 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.google.common.collect.Lists.newArrayList; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.unit.TimeValue.readTimeValue; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse; @@ -60,10 +60,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public static final int LIMIT_PORTS_COUNT = 1; private final ThreadPool threadPool; - private final TransportService transportService; - private final ClusterName clusterName; + private final Version version; private final int concurrentConnects; @@ -80,15 +79,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList(); - public UnicastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { - this(EMPTY_SETTINGS, threadPool, transportService, clusterName, null); - } - - public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, @Nullable Set unicastHostsProviders) { + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, @Nullable Set unicastHostsProviders) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.clusterName = clusterName; + this.version = version; if (unicastHostsProviders != null) { for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) { @@ -112,7 +108,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen TransportAddress[] addresses = transportService.addressesFromString(host); // we only limit to 1 addresses, makes no sense to ping 100 ports for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) { - nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i])); + nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i], version)); } } catch (Exception e) { throw new ElasticSearchIllegalArgumentException("Failed to resolve address for [" + host + "]", e); diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 97f48b1785f..9f77e5687a3 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -46,9 +46,7 @@ public class PublishClusterStateAction extends AbstractComponent { } private final TransportService transportService; - private final DiscoveryNodesProvider nodesProvider; - private final NewClusterStateListener listener; public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, @@ -92,7 +90,7 @@ public class PublishClusterStateAction extends AbstractComponent { } } transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, - new PublishClusterStateRequest(bytes), + new PublishClusterStateRequest(bytes, node.version()), TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @@ -107,13 +105,14 @@ public class PublishClusterStateAction extends AbstractComponent { class PublishClusterStateRequest extends TransportRequest { BytesReference clusterStateInBytes; - Version version = Version.CURRENT; + Version version; - private PublishClusterStateRequest() { + PublishClusterStateRequest() { } - private PublishClusterStateRequest(BytesReference clusterStateInBytes) { + PublishClusterStateRequest(BytesReference clusterStateInBytes, Version version) { this.clusterStateInBytes = clusterStateInBytes; + this.version = version; } @Override diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 08de00bf105..ae24a58603b 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -119,8 +119,10 @@ public final class InternalNode implements Node { public InternalNode(Settings pSettings, boolean loadConfigSettings) throws ElasticSearchException { Tuple tuple = InternalSettingsPerparer.prepareSettings(pSettings, loadConfigSettings); + Version version = Version.CURRENT; + ESLogger logger = Loggers.getLogger(Node.class, tuple.v1().get("name")); - logger.info("version[{}], pid[{}], build[{}/{}]", Version.CURRENT, JvmInfo.jvmInfo().pid(), Build.CURRENT.hashShort(), Build.CURRENT.timestamp()); + logger.info("version[{}], pid[{}], build[{}/{}]", version, JvmInfo.jvmInfo().pid(), Build.CURRENT.hashShort(), Build.CURRENT.timestamp()); logger.info("initializing ..."); @@ -140,6 +142,7 @@ public final class InternalNode implements Node { NodeEnvironment nodeEnvironment = new NodeEnvironment(this.settings, this.environment); ModulesBuilder modules = new ModulesBuilder(); + modules.add(new Version.Module(version)); modules.add(new CacheRecyclerModule(settings)); modules.add(new PluginsModule(settings, pluginsService)); modules.add(new SettingsModule(settings)); diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java index 9a5f41201f8..bef0f85be75 100644 --- a/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -70,7 +70,7 @@ public class NodeService extends AbstractComponent { @Inject public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, ClusterService clusterService, TransportService transportService, IndicesService indicesService, - PluginsService pluginService) { + PluginsService pluginService, Version version) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -82,7 +82,7 @@ public class NodeService extends AbstractComponent { if (address != null) { this.hostname = address.getHostName(); } - this.version = Version.CURRENT; + this.version = version; this.pluginService = pluginService; } diff --git a/src/main/java/org/elasticsearch/rest/action/main/RestMainAction.java b/src/main/java/org/elasticsearch/rest/action/main/RestMainAction.java index d3d42740c98..d8411226fd2 100644 --- a/src/main/java/org/elasticsearch/rest/action/main/RestMainAction.java +++ b/src/main/java/org/elasticsearch/rest/action/main/RestMainAction.java @@ -42,10 +42,12 @@ import static org.elasticsearch.rest.RestRequest.Method.HEAD; */ public class RestMainAction extends BaseRestHandler { - @Inject - public RestMainAction(Settings settings, Client client, RestController controller) { - super(settings, client); + private final Version version; + @Inject + public RestMainAction(Settings settings, Version version, Client client, RestController controller) { + super(settings, client); + this.version = version; controller.registerHandler(GET, "/", this); controller.registerHandler(HEAD, "/", this); } @@ -78,10 +80,10 @@ public class RestMainAction extends BaseRestHandler { builder.field("name", settings.get("name")); } builder.startObject("version") - .field("number", Version.CURRENT.number()) + .field("number", version.number()) .field("build_hash", Build.CURRENT.hash()) .field("build_timestamp", Build.CURRENT.timestamp()) - .field("build_snapshot", Version.CURRENT.snapshot) + .field("build_snapshot", version.snapshot) // We use the lucene version from lucene constants since // this includes bugfix release version as well and is already in // the right format. We can also be sure that the format is maitained diff --git a/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java b/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java index 43caa45c5dd..fe472678ba1 100644 --- a/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java +++ b/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java @@ -83,14 +83,21 @@ public class PlainTransportFuture extends BaseFutur @Override public void handleResponse(V response) { - handler.handleResponse(response); - set(response); + try { + handler.handleResponse(response); + set(response); + } catch (Throwable t) { + handleException(new ResponseHandlerFailureTransportException(t)); + } } @Override public void handleException(TransportException exp) { - handler.handleException(exp); - setException(exp); + try { + handler.handleException(exp); + } finally { + setException(exp); + } } @Override diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index a2ba7e45e1e..8a29527949f 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -173,6 +174,9 @@ public class TransportService extends AbstractLifecycleComponent void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, final TransportResponseHandler handler) throws TransportException { + if (node == null) { + throw new ElasticSearchIllegalStateException("can't send request to a null node"); + } final long requestId = newRequestId(); TimeoutHandler timeoutHandler = null; try { diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 36518ef78e5..b8f4c15b2a0 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -20,13 +20,13 @@ package org.elasticsearch.transport.local; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.*; -import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; @@ -48,6 +48,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new public class LocalTransport extends AbstractLifecycleComponent implements Transport { private final ThreadPool threadPool; + private final Version version; private volatile TransportServiceAdapter transportServiceAdapter; private volatile BoundTransportAddress boundAddress; private volatile LocalTransportAddress localAddress; @@ -55,14 +56,11 @@ public class LocalTransport extends AbstractLifecycleComponent implem private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); - public LocalTransport(ThreadPool threadPool) { - this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool); - } - @Inject - public LocalTransport(Settings settings, ThreadPool threadPool) { + public LocalTransport(Settings settings, ThreadPool threadPool, Version version) { super(settings); this.threadPool = threadPool; + this.version = version; } @Override @@ -151,8 +149,11 @@ public class LocalTransport extends AbstractLifecycleComponent implem @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + final Version version = Version.smallest(node.version(), this.version); + BytesStreamOutput bStream = new BytesStreamOutput(); StreamOutput stream = new HandlesStreamOutput(bStream); + stream.setVersion(version); stream.writeLong(requestId); byte status = 0; @@ -176,7 +177,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem threadPool.generic().execute(new Runnable() { @Override public void run() { - targetTransport.messageReceived(data, action, LocalTransport.this, requestId); + targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId); } }); } @@ -185,18 +186,19 @@ public class LocalTransport extends AbstractLifecycleComponent implem return this.threadPool; } - void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) { + void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { try { transportServiceAdapter.received(data.length); StreamInput stream = new BytesStreamInput(data, false); stream = CachedStreamInput.cachedHandles(stream); + stream.setVersion(version); long requestId = stream.readLong(); byte status = stream.readByte(); boolean isRequest = TransportStatus.isRequest(status); if (isRequest) { - handleRequest(stream, requestId, sourceTransport); + handleRequest(stream, requestId, sourceTransport, version); } else { final TransportResponseHandler handler = transportServiceAdapter.remove(requestId); // ignore if its null, the adapter logs it @@ -220,9 +222,9 @@ public class LocalTransport extends AbstractLifecycleComponent implem } } - private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport) throws Exception { + private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception { final String action = stream.readString(); - final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId); + final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId, version); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index 4e52cca1500..5977fb1aa14 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.local; +import org.elasticsearch.Version; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; @@ -35,19 +36,18 @@ import java.io.NotSerializableException; public class LocalTransportChannel implements TransportChannel { private final LocalTransport sourceTransport; - // the transport we will *send to* private final LocalTransport targetTransport; - private final String action; - private final long requestId; + private final Version version; - public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId) { + public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId, Version version) { this.sourceTransport = sourceTransport; this.targetTransport = targetTransport; this.action = action; this.requestId = requestId; + this.version = version; } @Override @@ -64,6 +64,7 @@ public class LocalTransportChannel implements TransportChannel { public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); StreamOutput stream = new HandlesStreamOutput(bStream); + stream.setVersion(version); stream.writeLong(requestId); byte status = 0; status = TransportStatus.setResponse(status); @@ -74,7 +75,7 @@ public class LocalTransportChannel implements TransportChannel { targetTransport.threadPool().generic().execute(new Runnable() { @Override public void run() { - targetTransport.messageReceived(data, action, sourceTransport, null); + targetTransport.messageReceived(data, action, sourceTransport, version, null); } }); } @@ -100,7 +101,7 @@ public class LocalTransportChannel implements TransportChannel { targetTransport.threadPool().generic().execute(new Runnable() { @Override public void run() { - targetTransport.messageReceived(data, action, sourceTransport, null); + targetTransport.messageReceived(data, action, sourceTransport, version, null); } }); } diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 9bbe1d36be8..1ba3775673d 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -76,7 +77,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.common.network.NetworkService.TcpSettings.*; -import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException; import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; @@ -95,6 +95,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem } private final NetworkService networkService; + final Version version; final int workerCount; final int bossCount; @@ -154,19 +155,12 @@ public class NettyTransport extends AbstractLifecycleComponent implem // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); - public NettyTransport(ThreadPool threadPool) { - this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS)); - } - - public NettyTransport(Settings settings, ThreadPool threadPool) { - this(settings, threadPool, new NetworkService(settings)); - } - @Inject - public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService) { + public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, Version version) { super(settings); this.threadPool = threadPool; this.networkService = networkService; + this.version = version; if (settings.getAsBoolean("netty.epollBugWorkaround", false)) { System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); @@ -541,13 +535,18 @@ public class NettyTransport extends AbstractLifecycleComponent implem } stream = new HandlesStreamOutput(stream); - stream.setVersion(node.version()); + // we pick the smallest of the 2, to support both backward and forward compatibility + // note, this is the only place we need to do this, since from here on, we use the serialized version + // as the version to use also when the node receiving this request will send the response with + Version version = Version.smallest(this.version, node.version()); + + stream.setVersion(version); stream.writeString(action); request.writeTo(stream); stream.close(); ChannelBuffer buffer = bStream.bytes().toChannelBuffer(); - NettyHeader.writeHeader(buffer, requestId, status, node.version()); + NettyHeader.writeHeader(buffer, requestId, status, version); targetChannel.write(buffer); // We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 685cd26ff23..a3ba13449a8 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -39,13 +39,9 @@ import java.io.NotSerializableException; public class NettyTransportChannel implements TransportChannel { private final NettyTransport transport; - private final Version version; - private final String action; - private final Channel channel; - private final long requestId; public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) { diff --git a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index f6a4d95c76d..bc7c46bb575 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -19,25 +19,23 @@ package org.elasticsearch.benchmark.transport; -import static org.elasticsearch.transport.TransportRequestOptions.options; - -import java.util.concurrent.CountDownLatch; - import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.BaseTransportRequestHandler; -import org.elasticsearch.transport.BaseTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import org.elasticsearch.transport.netty.NettyTransport; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.transport.TransportRequestOptions.options; + /** * */ @@ -52,11 +50,13 @@ public class BenchmarkNettyLargeMessages { Settings settings = ImmutableSettings.settingsBuilder() .build(); - final ThreadPool threadPool = new ThreadPool(); - final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); - final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start(); + NetworkService networkService = new NetworkService(settings); - final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300)); + final ThreadPool threadPool = new ThreadPool(); + final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start(); + final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start(); + + final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT); // final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300)); final DiscoveryNode smallNode = bigNode; diff --git a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index 071fd43e38d..b6e951d85c1 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -19,8 +19,10 @@ package org.elasticsearch.benchmark.transport; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -42,13 +44,13 @@ public class TransportBenchmark { LOCAL { @Override public Transport newTransport(Settings settings, ThreadPool threadPool) { - return new LocalTransport(settings, threadPool); + return new LocalTransport(settings, threadPool, Version.CURRENT); } }, NETTY { @Override public Transport newTransport(Settings settings, ThreadPool threadPool) { - return new NettyTransport(settings, threadPool); + return new NettyTransport(settings, threadPool, new NetworkService(ImmutableSettings.EMPTY), Version.CURRENT); } }; @@ -75,7 +77,7 @@ public class TransportBenchmark { final ThreadPool clientThreadPool = new ThreadPool(); final TransportService clientTransportService = new TransportService(type.newTransport(settings, clientThreadPool), clientThreadPool).start(); - final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress()); + final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress(), Version.CURRENT); serverTransportService.registerHandler("benchmark", new BaseTransportRequestHandler() { @Override diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/node/DiscoveryNodeFiltersTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/node/DiscoveryNodeFiltersTests.java index ebf594b7366..934b5407922 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/node/DiscoveryNodeFiltersTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/node/DiscoveryNodeFiltersTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.test.unit.cluster.node; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeFilters; import org.elasticsearch.common.settings.ImmutableSettings; @@ -43,10 +44,10 @@ public class DiscoveryNodeFiltersTests { .build(); DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings); - DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); - node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(false)); } @@ -57,10 +58,10 @@ public class DiscoveryNodeFiltersTests { .build(); DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings); - DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); - node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(false)); } @@ -72,13 +73,13 @@ public class DiscoveryNodeFiltersTests { .build(); DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings); - DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); - node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); - node = new DiscoveryNode("name3", "id3", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + node = new DiscoveryNode("name3", "id3", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(false)); } @@ -91,18 +92,18 @@ public class DiscoveryNodeFiltersTests { DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(AND, "xxx.", settings); DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, - ImmutableMap.of("tag", "A", "group", "B")); + ImmutableMap.of("tag", "A", "group", "B"), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, - ImmutableMap.of("tag", "A", "group", "B", "name", "X")); + ImmutableMap.of("tag", "A", "group", "B", "name", "X"), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); node = new DiscoveryNode("name3", "id3", DummyTransportAddress.INSTANCE, - ImmutableMap.of("tag", "A", "group", "F", "name", "X")); + ImmutableMap.of("tag", "A", "group", "F", "name", "X"), Version.CURRENT); assertThat(filters.match(node), equalTo(false)); - node = new DiscoveryNode("name4", "id4", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + node = new DiscoveryNode("name4", "id4", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(false)); } @@ -113,7 +114,7 @@ public class DiscoveryNodeFiltersTests { .build(); DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings); - DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of()); + DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.of(), Version.CURRENT); assertThat(filters.match(node), equalTo(true)); } } diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/RoutingAllocationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/RoutingAllocationTests.java index ac8ead79617..9bcc2e25778 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/RoutingAllocationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/RoutingAllocationTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.unit.cluster.routing.allocation; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.TransportAddress; @@ -30,14 +31,14 @@ import java.util.Map; public class RoutingAllocationTests { public static DiscoveryNode newNode(String nodeId) { - return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); } public static DiscoveryNode newNode(String nodeId, TransportAddress address) { - return new DiscoveryNode(nodeId, address); + return new DiscoveryNode(nodeId, address, Version.CURRENT); } public static DiscoveryNode newNode(String nodeId, Map attributes) { - return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes); + return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java index fdcfe07aad6..3048abc3255 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/serialization/ClusterSerializationTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.unit.cluster.serialization; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -91,6 +92,6 @@ public class ClusterSerializationTests { } private DiscoveryNode newNode(String nodeId) { - return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); } } diff --git a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java index b9de67d4451..5730c72ae61 100644 --- a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java @@ -19,10 +19,13 @@ package org.elasticsearch.test.unit.discovery.zen.ping.multicast; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -49,15 +52,16 @@ public class MulticastZenPingTests { @Test public void testSimplePings() { + Settings settings = ImmutableSettings.EMPTY; ThreadPool threadPool = new ThreadPool(); ClusterName clusterName = new ClusterName("test"); - final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); - final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress()); + final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); + final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - final TransportService transportServiceB = new TransportService(new LocalTransport(threadPool), threadPool).start(); - final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress()); + final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); + final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName); + MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); zenPingA.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { @@ -71,7 +75,7 @@ public class MulticastZenPingTests { }); zenPingA.start(); - MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName); + MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName, Version.CURRENT); zenPingB.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { @@ -100,13 +104,13 @@ public class MulticastZenPingTests { @Test public void testExternalPing() throws Exception { + Settings settings = ImmutableSettings.EMPTY; ThreadPool threadPool = new ThreadPool(); - ClusterName clusterName = new ClusterName("test"); - final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); - final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress()); + final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start(); + final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName); + MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); zenPingA.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { diff --git a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java index a370629ba9c..aad5df948ed 100644 --- a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -19,9 +19,11 @@ package org.elasticsearch.test.unit.discovery.zen.ping.unicast; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -45,17 +47,20 @@ public class UnicastZenPingTests { @Test public void testSimplePings() { + Settings settings = ImmutableSettings.EMPTY; ThreadPool threadPool = new ThreadPool(); ClusterName clusterName = new ClusterName("test"); - NettyTransport transportA = new NettyTransport(threadPool); + NetworkService networkService = new NetworkService(settings); + + NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, Version.CURRENT); final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); - final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress()); + final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); - NettyTransport transportB = new NettyTransport(threadPool); + NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, Version.CURRENT); final TransportService transportServiceB = new TransportService(transportB, threadPool).start(); - final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress()); + final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress(); @@ -64,7 +69,7 @@ public class UnicastZenPingTests { addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort()) .build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, null); + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, null); zenPingA.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { @@ -78,7 +83,7 @@ public class UnicastZenPingTests { }); zenPingA.start(); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, null); + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, null); zenPingB.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { diff --git a/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java b/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java index 27afbfc3ac0..21ab9899e31 100644 --- a/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java +++ b/src/test/java/org/elasticsearch/test/unit/transport/AbstractSimpleTransportTests.java @@ -19,9 +19,13 @@ package org.elasticsearch.test.unit.transport; +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -44,29 +48,37 @@ public abstract class AbstractSimpleTransportTests { protected ThreadPool threadPool; + protected static final Version version0 = Version.fromId(/*0*/99); + protected DiscoveryNode nodeA; protected TransportService serviceA; + + protected static final Version version1 = Version.fromId(199); + protected DiscoveryNode nodeB; protected TransportService serviceB; - protected DiscoveryNode serviceANode; - protected DiscoveryNode serviceBNode; + + protected abstract TransportService build(Settings settings, Version version); @Before public void setUp() { threadPool = new ThreadPool(); - build(); - serviceA.connectToNode(serviceBNode); - serviceB.connectToNode(serviceANode); + serviceA = build(ImmutableSettings.builder().put("name", "A").build(), version0); + nodeA = new DiscoveryNode("A", "A", serviceA.boundAddress().publishAddress(), ImmutableMap.of(), version0); + serviceB = build(ImmutableSettings.builder().put("name", "B").build(), version1); + nodeB = new DiscoveryNode("B", "B", serviceB.boundAddress().publishAddress(), ImmutableMap.of(), version1); + + serviceA.connectToNode(nodeB); + serviceA.connectToNode(nodeA); + serviceB.connectToNode(nodeA); + serviceB.connectToNode(nodeB); } @After public void tearDown() { serviceA.close(); serviceB.close(); - threadPool.shutdown(); } - protected abstract void build(); - @Test public void testHelloWorld() { serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { @@ -92,7 +104,7 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -123,7 +135,7 @@ public abstract class AbstractSimpleTransportTests { assertThat(e.getMessage(), false, equalTo(true)); } - res = serviceB.submitRequest(serviceANode, "sayHello", + res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -181,7 +193,7 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { @@ -239,7 +251,7 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -293,7 +305,7 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", + TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloException", new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -371,7 +383,7 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse", + TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse", new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -434,7 +446,7 @@ public abstract class AbstractSimpleTransportTests { } }); - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", + TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -470,7 +482,7 @@ public abstract class AbstractSimpleTransportTests { for (int i = 0; i < 10; i++) { final int counter = i; // now, try and send another request, this times, with a short timeout - res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", + res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", new StringMessageRequest(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -548,4 +560,292 @@ public abstract class AbstractSimpleTransportTests { out.writeString(message); } } + + + static class Version0Request extends TransportRequest { + + int value1; + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + value1 = in.readInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeInt(value1); + } + } + + static class Version1Request extends Version0Request { + + int value2; + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + if (in.getVersion().onOrAfter(version1)) { + value2 = in.readInt(); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(version1)) { + out.writeInt(value2); + } + } + } + + static class Version0Response extends TransportResponse { + + int value1; + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + value1 = in.readInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeInt(value1); + } + } + + static class Version1Response extends Version0Response { + + int value2; + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + if (in.getVersion().onOrAfter(version1)) { + value2 = in.readInt(); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(version1)) { + out.writeInt(value2); + } + } + } + + @Test + public void testVersion_from0to1() throws Exception { + serviceB.registerHandler("/version", new BaseTransportRequestHandler() { + @Override + public Version1Request newInstance() { + return new Version1Request(); + } + + @Override + public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + assertThat(request.value2, equalTo(0)); // not set, coming from service A + Version1Response response = new Version1Response(); + response.value1 = 1; + response.value2 = 2; + channel.sendResponse(response); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + + Version0Request version0Request = new Version0Request(); + version0Request.value1 = 1; + Version0Response version0Response = serviceA.submitRequest(nodeB, "/version", version0Request, new BaseTransportResponseHandler() { + @Override + public Version0Response newInstance() { + return new Version0Response(); + } + + @Override + public void handleResponse(Version0Response response) { + assertThat(response.value1, equalTo(1)); + } + + @Override + public void handleException(TransportException exp) { + exp.printStackTrace(); + assert false; + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); + + assertThat(version0Response.value1, equalTo(1)); + } + + @Test + public void testVersion_from1to0() throws Exception { + serviceA.registerHandler("/version", new BaseTransportRequestHandler() { + @Override + public Version0Request newInstance() { + return new Version0Request(); + } + + @Override + public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + Version0Response response = new Version0Response(); + response.value1 = 1; + channel.sendResponse(response); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + + Version1Request version1Request = new Version1Request(); + version1Request.value1 = 1; + version1Request.value2 = 2; + Version1Response version1Response = serviceB.submitRequest(nodeA, "/version", version1Request, new BaseTransportResponseHandler() { + @Override + public Version1Response newInstance() { + return new Version1Response(); + } + + @Override + public void handleResponse(Version1Response response) { + assertThat(response.value1, equalTo(1)); + assertThat(response.value2, equalTo(0)); // initial values, cause its serialized from version 0 + } + + @Override + public void handleException(TransportException exp) { + exp.printStackTrace(); + assert false; + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); + + assertThat(version1Response.value1, equalTo(1)); + assertThat(version1Response.value2, equalTo(0)); + } + + @Test + public void testVersion_from1to1() throws Exception { + serviceB.registerHandler("/version", new BaseTransportRequestHandler() { + @Override + public Version1Request newInstance() { + return new Version1Request(); + } + + @Override + public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + assertThat(request.value2, equalTo(2)); + Version1Response response = new Version1Response(); + response.value1 = 1; + response.value2 = 2; + channel.sendResponse(response); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + + Version1Request version1Request = new Version1Request(); + version1Request.value1 = 1; + version1Request.value2 = 2; + Version1Response version1Response = serviceB.submitRequest(nodeB, "/version", version1Request, new BaseTransportResponseHandler() { + @Override + public Version1Response newInstance() { + return new Version1Response(); + } + + @Override + public void handleResponse(Version1Response response) { + assertThat(response.value1, equalTo(1)); + assertThat(response.value2, equalTo(2)); + } + + @Override + public void handleException(TransportException exp) { + exp.printStackTrace(); + assert false; + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); + + assertThat(version1Response.value1, equalTo(1)); + assertThat(version1Response.value2, equalTo(2)); + } + + @Test + public void testVersion_from0to0() throws Exception { + serviceA.registerHandler("/version", new BaseTransportRequestHandler() { + @Override + public Version0Request newInstance() { + return new Version0Request(); + } + + @Override + public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + Version0Response response = new Version0Response(); + response.value1 = 1; + channel.sendResponse(response); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + + Version0Request version0Request = new Version0Request(); + version0Request.value1 = 1; + Version0Response version0Response = serviceA.submitRequest(nodeA, "/version", version0Request, new BaseTransportResponseHandler() { + @Override + public Version0Response newInstance() { + return new Version0Response(); + } + + @Override + public void handleResponse(Version0Response response) { + assertThat(response.value1, equalTo(1)); + } + + @Override + public void handleException(TransportException exp) { + exp.printStackTrace(); + assert false; + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); + + assertThat(version0Response.value1, equalTo(1)); + } } diff --git a/src/test/java/org/elasticsearch/test/unit/transport/local/SimpleLocalTransportTests.java b/src/test/java/org/elasticsearch/test/unit/transport/local/SimpleLocalTransportTests.java index 2b3541e0069..8eda003ae3d 100644 --- a/src/test/java/org/elasticsearch/test/unit/transport/local/SimpleLocalTransportTests.java +++ b/src/test/java/org/elasticsearch/test/unit/transport/local/SimpleLocalTransportTests.java @@ -19,20 +19,16 @@ package org.elasticsearch.test.unit.transport.local; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.Version; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.unit.transport.AbstractSimpleTransportTests; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; -import org.junit.Test; public class SimpleLocalTransportTests extends AbstractSimpleTransportTests { @Override - protected void build() { - serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); - serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress()); - - serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start(); - serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); + protected TransportService build(Settings settings, Version version) { + return new TransportService(new LocalTransport(settings, threadPool, version), threadPool).start(); } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/unit/transport/netty/SimpleNettyTransportTests.java b/src/test/java/org/elasticsearch/test/unit/transport/netty/SimpleNettyTransportTests.java index 635306fdc14..4c44fa0e4cb 100644 --- a/src/test/java/org/elasticsearch/test/unit/transport/netty/SimpleNettyTransportTests.java +++ b/src/test/java/org/elasticsearch/test/unit/transport/netty/SimpleNettyTransportTests.java @@ -19,7 +19,10 @@ package org.elasticsearch.test.unit.transport.netty; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.test.unit.transport.AbstractSimpleTransportTests; import org.elasticsearch.transport.ConnectTransportException; @@ -27,23 +30,17 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; import org.junit.Test; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; - public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { @Override - protected void build() { - serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool).start(); - serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress()); - - serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool).start(); - serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); + protected TransportService build(Settings settings, Version version) { + return new TransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool).start(); } @Test public void testConnectException() { try { - serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876))); + serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876), Version.CURRENT)); assert false; } catch (ConnectTransportException e) { // e.printStackTrace();