fix transport support for versioning

when starting the request/response cycle, we should use the lowest version out of the current node version, and the target node version to serialize the request, and put it in the header. this will allow to support both backward and forward comp.
in addition, have Version as an injected value to different services, to make different versions more easily testable, compared to using Version#CURRENT
This commit is contained in:
Shay Banon 2013-07-27 22:13:48 +02:00
parent 6b8123d726
commit 28a4ac01e4
30 changed files with 527 additions and 192 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
@ -242,6 +243,13 @@ public class Version implements Serializable {
out.writeVInt(version.id);
}
/**
* Returns the smallest version between the 2.
*/
public static Version smallest(Version version1, Version version2) {
return version1.id < version2.id ? version1 : version2;
}
public final int id;
public final byte major;
public final byte minor;
@ -324,4 +332,18 @@ public class Version implements Serializable {
public int hashCode() {
return id;
}
public static class Module extends AbstractModule {
private final Version version;
public Module(Version version) {
this.version = version;
}
@Override
protected void configure() {
bind(Version.class).toInstance(version);
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client.transport;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -163,9 +164,12 @@ public class TransportClient extends AbstractClient {
this.pluginsService = new PluginsService(settings, tuple.v2());
this.settings = pluginsService.updatedSettings();
Version version = Version.CURRENT;
CompressorFactory.configure(this.settings);
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new CacheRecyclerModule(settings));
modules.add(new PluginsModule(this.settings, pluginsService));
modules.add(new EnvironmentModule(environment));

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -63,6 +64,8 @@ public class TransportClientNodesService extends AbstractComponent {
private final ThreadPool threadPool;
private final Version version;
// nodes that are added to be discovered
private volatile ImmutableList<DiscoveryNode> listedNodes = ImmutableList.of();
@ -83,12 +86,12 @@ public class TransportClientNodesService extends AbstractComponent {
private volatile boolean closed;
@Inject
public TransportClientNodesService(Settings settings, ClusterName clusterName,
TransportService transportService, ThreadPool threadPool) {
public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService, ThreadPool threadPool, Version version) {
super(settings);
this.clusterName = clusterName;
this.transportService = transportService;
this.threadPool = threadPool;
this.version = version;
this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5));
this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis();
@ -147,7 +150,7 @@ public class TransportClientNodesService extends AbstractComponent {
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
builder.addAll(listedNodes());
for (TransportAddress transportAddress : filtered) {
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress);
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, version);
logger.debug("adding address [{}]", node);
builder.add(node);
}

View File

@ -85,24 +85,18 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
public class MetaDataCreateIndexService extends AbstractComponent {
private final Environment environment;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final AllocationService allocationService;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
private final MetaDataService metaDataService;
private final Version version;
private final String riverIndexName;
@Inject
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) {
AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName) {
super(settings);
this.environment = environment;
this.threadPool = threadPool;
@ -111,6 +105,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
this.allocationService = allocationService;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.metaDataService = metaDataService;
this.version = version;
this.riverIndexName = riverIndexName;
}
@ -257,7 +252,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));
}
indexSettingsBuilder.put(SETTING_VERSION_CREATED, Version.CURRENT);
indexSettingsBuilder.put(SETTING_VERSION_CREATED, version);
Settings actualIndexSettings = indexSettingsBuilder.build();

View File

@ -67,27 +67,21 @@ public class DiscoveryNode implements Streamable, Serializable {
public static final ImmutableList<DiscoveryNode> EMPTY_LIST = ImmutableList.of();
private String nodeName = "".intern();
private String nodeName = "";
private String nodeId;
private TransportAddress address;
private ImmutableMap<String, String> attributes;
private Version version = Version.CURRENT;
private DiscoveryNode() {
DiscoveryNode() {
}
public DiscoveryNode(String nodeId, TransportAddress address) {
this("", nodeId, address, ImmutableMap.<String, String>of());
public DiscoveryNode(String nodeId, TransportAddress address, Version version) {
this("", nodeId, address, ImmutableMap.<String, String>of(), version);
}
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map<String, String> attributes) {
if (nodeName == null) {
this.nodeName = "".intern();
} else {
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map<String, String> attributes, Version version) {
if (nodeName != null) {
this.nodeName = nodeName.intern();
}
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
@ -97,6 +91,7 @@ public class DiscoveryNode implements Streamable, Serializable {
this.attributes = builder.build();
this.nodeId = nodeId.intern();
this.address = address;
this.version = version;
}
/**

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery.local;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -55,14 +56,11 @@ import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery {
private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final Version version;
private DiscoveryNode localNode;
@ -78,12 +76,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService) {
DiscoveryNodeService discoveryNodeService, Version version) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
}
@Override
@ -106,7 +105,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
logger.debug("Connected to cluster [{}]", clusterName);
this.localNode = new DiscoveryNode(settings.get("name"), Long.toString(nodeIdGenerator.incrementAndGet()), transportService.boundAddress().publishAddress(),
discoveryNodeService.buildAttributes());
discoveryNodeService.buildAttributes(), version);
clusterGroup.members().add(this);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -76,26 +77,17 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoveryNodeService discoveryNodeService;
private final ZenPingService pingService;
private final MasterFaultDetection masterFD;
private final NodesFaultDetection nodesFD;
private final PublishClusterStateAction publishClusterState;
private final MembershipAction membership;
private final Version version;
private final TimeValue pingTimeout;
@ -127,7 +119,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
@ -135,6 +127,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.pingService = pingService;
this.version = version;
// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
@ -176,7 +169,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
String nodeId = UUID.randomBase64UUID();
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
nodesFD.updateNodes(latestDiscoNodes);
pingService.start();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen.ping;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
@ -51,17 +52,22 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
private volatile ImmutableList<? extends ZenPing> zenPings = ImmutableList.of();
@Inject
// here for backward comp. with discovery plugins
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, unicastHostsProviders);
}
@Inject
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
Version version, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService));
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, unicastHostsProviders));
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, unicastHostsProviders));
this.zenPings = zenPingsBuilder.build();
}

View File

@ -71,6 +71,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private final TransportService transportService;
private final ClusterName clusterName;
private final NetworkService networkService;
private final Version version;
private volatile DiscoveryNodesProvider nodesProvider;
private final boolean pingEnabled;
@ -87,16 +88,17 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private final Object sendMutex = new Object();
private final Object receiveMutex = new Object();
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS));
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
}
public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService) {
public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, Version version) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.networkService = networkService;
this.version = version;
this.address = componentSettings.get("address");
this.port = componentSettings.getAsInt("port", 54328);
@ -256,7 +258,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput out = new HandlesStreamOutput(bStream);
out.writeBytes(INTERNAL_HEADER);
Version.writeVersion(Version.CURRENT, out);
Version.writeVersion(version, out);
out.writeInt(id);
clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out);
@ -467,7 +469,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.startObject().startObject("response");
builder.field("cluster_name", MulticastZenPing.this.clusterName.value());
builder.startObject("version").field("number", Version.CURRENT.number()).field("snapshot_build", Version.CURRENT.snapshot).endObject();
builder.startObject("version").field("number", version.number()).field("snapshot_build", version.snapshot).endObject();
builder.field("transport_address", localNode.address().toString());
if (nodesProvider.nodeService() != null) {

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -47,7 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse;
@ -60,10 +60,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public static final int LIMIT_PORTS_COUNT = 1;
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final Version version;
private final int concurrentConnects;
@ -80,15 +79,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<UnicastHostsProvider>();
public UnicastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, null);
}
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.version = version;
if (unicastHostsProviders != null) {
for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) {
@ -112,7 +108,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
TransportAddress[] addresses = transportService.addressesFromString(host);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) {
nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i]));
nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i], version));
}
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("Failed to resolve address for [" + host + "]", e);

View File

@ -46,9 +46,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
@ -92,7 +90,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(bytes),
new PublishClusterStateRequest(bytes, node.version()),
TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@ -107,13 +105,14 @@ public class PublishClusterStateAction extends AbstractComponent {
class PublishClusterStateRequest extends TransportRequest {
BytesReference clusterStateInBytes;
Version version = Version.CURRENT;
Version version;
private PublishClusterStateRequest() {
PublishClusterStateRequest() {
}
private PublishClusterStateRequest(BytesReference clusterStateInBytes) {
PublishClusterStateRequest(BytesReference clusterStateInBytes, Version version) {
this.clusterStateInBytes = clusterStateInBytes;
this.version = version;
}
@Override

View File

@ -119,8 +119,10 @@ public final class InternalNode implements Node {
public InternalNode(Settings pSettings, boolean loadConfigSettings) throws ElasticSearchException {
Tuple<Settings, Environment> tuple = InternalSettingsPerparer.prepareSettings(pSettings, loadConfigSettings);
Version version = Version.CURRENT;
ESLogger logger = Loggers.getLogger(Node.class, tuple.v1().get("name"));
logger.info("version[{}], pid[{}], build[{}/{}]", Version.CURRENT, JvmInfo.jvmInfo().pid(), Build.CURRENT.hashShort(), Build.CURRENT.timestamp());
logger.info("version[{}], pid[{}], build[{}/{}]", version, JvmInfo.jvmInfo().pid(), Build.CURRENT.hashShort(), Build.CURRENT.timestamp());
logger.info("initializing ...");
@ -140,6 +142,7 @@ public final class InternalNode implements Node {
NodeEnvironment nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new CacheRecyclerModule(settings));
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new SettingsModule(settings));

View File

@ -70,7 +70,7 @@ public class NodeService extends AbstractComponent {
@Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
ClusterService clusterService, TransportService transportService, IndicesService indicesService,
PluginsService pluginService) {
PluginsService pluginService, Version version) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -82,7 +82,7 @@ public class NodeService extends AbstractComponent {
if (address != null) {
this.hostname = address.getHostName();
}
this.version = Version.CURRENT;
this.version = version;
this.pluginService = pluginService;
}

View File

@ -42,10 +42,12 @@ import static org.elasticsearch.rest.RestRequest.Method.HEAD;
*/
public class RestMainAction extends BaseRestHandler {
@Inject
public RestMainAction(Settings settings, Client client, RestController controller) {
super(settings, client);
private final Version version;
@Inject
public RestMainAction(Settings settings, Version version, Client client, RestController controller) {
super(settings, client);
this.version = version;
controller.registerHandler(GET, "/", this);
controller.registerHandler(HEAD, "/", this);
}
@ -78,10 +80,10 @@ public class RestMainAction extends BaseRestHandler {
builder.field("name", settings.get("name"));
}
builder.startObject("version")
.field("number", Version.CURRENT.number())
.field("number", version.number())
.field("build_hash", Build.CURRENT.hash())
.field("build_timestamp", Build.CURRENT.timestamp())
.field("build_snapshot", Version.CURRENT.snapshot)
.field("build_snapshot", version.snapshot)
// We use the lucene version from lucene constants since
// this includes bugfix release version as well and is already in
// the right format. We can also be sure that the format is maitained

View File

@ -83,14 +83,21 @@ public class PlainTransportFuture<V extends TransportResponse> extends BaseFutur
@Override
public void handleResponse(V response) {
handler.handleResponse(response);
set(response);
try {
handler.handleResponse(response);
set(response);
} catch (Throwable t) {
handleException(new ResponseHandlerFailureTransportException(t));
}
}
@Override
public void handleException(TransportException exp) {
handler.handleException(exp);
setException(exp);
try {
handler.handleException(exp);
} finally {
setException(exp);
}
}
@Override

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -173,6 +174,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
if (node == null) {
throw new ElasticSearchIllegalStateException("can't send request to a null node");
}
final long requestId = newRequestId();
TimeoutHandler timeoutHandler = null;
try {

View File

@ -20,13 +20,13 @@
package org.elasticsearch.transport.local;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
@ -48,6 +48,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport {
private final ThreadPool threadPool;
private final Version version;
private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress;
private volatile LocalTransportAddress localAddress;
@ -55,14 +56,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
public LocalTransport(ThreadPool threadPool) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool);
}
@Inject
public LocalTransport(Settings settings, ThreadPool threadPool) {
public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings);
this.threadPool = threadPool;
this.version = version;
}
@Override
@ -151,8 +149,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
final Version version = Version.smallest(node.version(), this.version);
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = new HandlesStreamOutput(bStream);
stream.setVersion(version);
stream.writeLong(requestId);
byte status = 0;
@ -176,7 +177,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
}
});
}
@ -185,18 +186,19 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
return this.threadPool;
}
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) {
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
try {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data, false);
stream = CachedStreamInput.cachedHandles(stream);
stream.setVersion(version);
long requestId = stream.readLong();
byte status = stream.readByte();
boolean isRequest = TransportStatus.isRequest(status);
if (isRequest) {
handleRequest(stream, requestId, sourceTransport);
handleRequest(stream, requestId, sourceTransport, version);
} else {
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
@ -220,9 +222,9 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
}
private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport) throws Exception {
private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception {
final String action = stream.readString();
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId);
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId, version);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.local;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
@ -35,19 +36,18 @@ import java.io.NotSerializableException;
public class LocalTransportChannel implements TransportChannel {
private final LocalTransport sourceTransport;
// the transport we will *send to*
private final LocalTransport targetTransport;
private final String action;
private final long requestId;
private final Version version;
public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId) {
public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId, Version version) {
this.sourceTransport = sourceTransport;
this.targetTransport = targetTransport;
this.action = action;
this.requestId = requestId;
this.version = version;
}
@Override
@ -64,6 +64,7 @@ public class LocalTransportChannel implements TransportChannel {
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = new HandlesStreamOutput(bStream);
stream.setVersion(version);
stream.writeLong(requestId);
byte status = 0;
status = TransportStatus.setResponse(status);
@ -74,7 +75,7 @@ public class LocalTransportChannel implements TransportChannel {
targetTransport.threadPool().generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
targetTransport.messageReceived(data, action, sourceTransport, version, null);
}
});
}
@ -100,7 +101,7 @@ public class LocalTransportChannel implements TransportChannel {
targetTransport.threadPool().generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
targetTransport.messageReceived(data, action, sourceTransport, version, null);
}
});
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -76,7 +77,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@ -95,6 +95,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
private final NetworkService networkService;
final Version version;
final int workerCount;
final int bossCount;
@ -154,19 +155,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
public NettyTransport(ThreadPool threadPool) {
this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS));
}
public NettyTransport(Settings settings, ThreadPool threadPool) {
this(settings, threadPool, new NetworkService(settings));
}
@Inject
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService) {
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, Version version) {
super(settings);
this.threadPool = threadPool;
this.networkService = networkService;
this.version = version;
if (settings.getAsBoolean("netty.epollBugWorkaround", false)) {
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
@ -541,13 +535,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
stream = new HandlesStreamOutput(stream);
stream.setVersion(node.version());
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
stream.setVersion(version);
stream.writeString(action);
request.writeTo(stream);
stream.close();
ChannelBuffer buffer = bStream.bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, node.version());
NettyHeader.writeHeader(buffer, requestId, status, version);
targetChannel.write(buffer);
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future

View File

@ -39,13 +39,9 @@ import java.io.NotSerializableException;
public class NettyTransportChannel implements TransportChannel {
private final NettyTransport transport;
private final Version version;
private final String action;
private final Channel channel;
private final long requestId;
public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) {

View File

@ -19,25 +19,23 @@
package org.elasticsearch.benchmark.transport;
import static org.elasticsearch.transport.TransportRequestOptions.options;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.netty.NettyTransport;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
*
*/
@ -52,11 +50,13 @@ public class BenchmarkNettyLargeMessages {
Settings settings = ImmutableSettings.settingsBuilder()
.build();
final ThreadPool threadPool = new ThreadPool();
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
NetworkService networkService = new NetworkService(settings);
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300));
final ThreadPool threadPool = new ThreadPool();
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start();
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT);
// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300));
final DiscoveryNode smallNode = bigNode;

View File

@ -19,8 +19,10 @@
package org.elasticsearch.benchmark.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -42,13 +44,13 @@ public class TransportBenchmark {
LOCAL {
@Override
public Transport newTransport(Settings settings, ThreadPool threadPool) {
return new LocalTransport(settings, threadPool);
return new LocalTransport(settings, threadPool, Version.CURRENT);
}
},
NETTY {
@Override
public Transport newTransport(Settings settings, ThreadPool threadPool) {
return new NettyTransport(settings, threadPool);
return new NettyTransport(settings, threadPool, new NetworkService(ImmutableSettings.EMPTY), Version.CURRENT);
}
};
@ -75,7 +77,7 @@ public class TransportBenchmark {
final ThreadPool clientThreadPool = new ThreadPool();
final TransportService clientTransportService = new TransportService(type.newTransport(settings, clientThreadPool), clientThreadPool).start();
final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress());
final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress(), Version.CURRENT);
serverTransportService.registerHandler("benchmark", new BaseTransportRequestHandler<BenchmarkMessageRequest>() {
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.test.unit.cluster.node;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -43,10 +44,10 @@ public class DiscoveryNodeFiltersTests {
.build();
DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings);
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(false));
}
@ -57,10 +58,10 @@ public class DiscoveryNodeFiltersTests {
.build();
DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings);
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(false));
}
@ -72,13 +73,13 @@ public class DiscoveryNodeFiltersTests {
.build();
DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings);
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
node = new DiscoveryNode("name3", "id3", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
node = new DiscoveryNode("name3", "id3", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(false));
}
@ -91,18 +92,18 @@ public class DiscoveryNodeFiltersTests {
DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(AND, "xxx.", settings);
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE,
ImmutableMap.<String, String>of("tag", "A", "group", "B"));
ImmutableMap.<String, String>of("tag", "A", "group", "B"), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
node = new DiscoveryNode("name2", "id2", DummyTransportAddress.INSTANCE,
ImmutableMap.<String, String>of("tag", "A", "group", "B", "name", "X"));
ImmutableMap.<String, String>of("tag", "A", "group", "B", "name", "X"), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
node = new DiscoveryNode("name3", "id3", DummyTransportAddress.INSTANCE,
ImmutableMap.<String, String>of("tag", "A", "group", "F", "name", "X"));
ImmutableMap.<String, String>of("tag", "A", "group", "F", "name", "X"), Version.CURRENT);
assertThat(filters.match(node), equalTo(false));
node = new DiscoveryNode("name4", "id4", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
node = new DiscoveryNode("name4", "id4", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(false));
}
@ -113,7 +114,7 @@ public class DiscoveryNodeFiltersTests {
.build();
DiscoveryNodeFilters filters = DiscoveryNodeFilters.buildFromSettings(OR, "xxx.", settings);
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of());
DiscoveryNode node = new DiscoveryNode("name1", "id1", DummyTransportAddress.INSTANCE, ImmutableMap.<String, String>of(), Version.CURRENT);
assertThat(filters.match(node), equalTo(true));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -30,14 +31,14 @@ import java.util.Map;
public class RoutingAllocationTests {
public static DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
public static DiscoveryNode newNode(String nodeId, TransportAddress address) {
return new DiscoveryNode(nodeId, address);
return new DiscoveryNode(nodeId, address, Version.CURRENT);
}
public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes);
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.unit.cluster.serialization;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -91,6 +92,6 @@ public class ClusterSerializationTests {
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
}

View File

@ -19,10 +19,13 @@
package org.elasticsearch.test.unit.discovery.zen.ping.multicast;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -49,15 +52,16 @@ public class MulticastZenPingTests {
@Test
public void testSimplePings() {
Settings settings = ImmutableSettings.EMPTY;
ThreadPool threadPool = new ThreadPool();
ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
final TransportService transportServiceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress());
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
@ -71,7 +75,7 @@ public class MulticastZenPingTests {
});
zenPingA.start();
MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName);
MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName, Version.CURRENT);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
@ -100,13 +104,13 @@ public class MulticastZenPingTests {
@Test
public void testExternalPing() throws Exception {
Settings settings = ImmutableSettings.EMPTY;
ThreadPool threadPool = new ThreadPool();
ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {

View File

@ -19,9 +19,11 @@
package org.elasticsearch.test.unit.discovery.zen.ping.unicast;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -45,17 +47,20 @@ public class UnicastZenPingTests {
@Test
public void testSimplePings() {
Settings settings = ImmutableSettings.EMPTY;
ThreadPool threadPool = new ThreadPool();
ClusterName clusterName = new ClusterName("test");
NettyTransport transportA = new NettyTransport(threadPool);
NetworkService networkService = new NetworkService(settings);
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, Version.CURRENT);
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
NettyTransport transportB = new NettyTransport(threadPool);
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, Version.CURRENT);
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress());
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress();
@ -64,7 +69,7 @@ public class UnicastZenPingTests {
addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort())
.build();
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, null);
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, null);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
@ -78,7 +83,7 @@ public class UnicastZenPingTests {
});
zenPingA.start();
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, null);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, null);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {

View File

@ -19,9 +19,13 @@
package org.elasticsearch.test.unit.transport;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -44,29 +48,37 @@ public abstract class AbstractSimpleTransportTests {
protected ThreadPool threadPool;
protected static final Version version0 = Version.fromId(/*0*/99);
protected DiscoveryNode nodeA;
protected TransportService serviceA;
protected static final Version version1 = Version.fromId(199);
protected DiscoveryNode nodeB;
protected TransportService serviceB;
protected DiscoveryNode serviceANode;
protected DiscoveryNode serviceBNode;
protected abstract TransportService build(Settings settings, Version version);
@Before
public void setUp() {
threadPool = new ThreadPool();
build();
serviceA.connectToNode(serviceBNode);
serviceB.connectToNode(serviceANode);
serviceA = build(ImmutableSettings.builder().put("name", "A").build(), version0);
nodeA = new DiscoveryNode("A", "A", serviceA.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version0);
serviceB = build(ImmutableSettings.builder().put("name", "B").build(), version1);
nodeB = new DiscoveryNode("B", "B", serviceB.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version1);
serviceA.connectToNode(nodeB);
serviceA.connectToNode(nodeA);
serviceB.connectToNode(nodeA);
serviceB.connectToNode(nodeB);
}
@After
public void tearDown() {
serviceA.close();
serviceB.close();
threadPool.shutdown();
}
protected abstract void build();
@Test
public void testHelloWorld() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessageRequest>() {
@ -92,7 +104,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHello",
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -123,7 +135,7 @@ public abstract class AbstractSimpleTransportTests {
assertThat(e.getMessage(), false, equalTo(true));
}
res = serviceB.submitRequest(serviceANode, "sayHello",
res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -181,7 +193,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(serviceANode, "sayHello",
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty newInstance() {
@ -239,7 +251,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHello",
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -293,7 +305,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHelloException",
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloException",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -371,7 +383,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse",
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse",
new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -434,7 +446,7 @@ public abstract class AbstractSimpleTransportTests {
}
});
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -470,7 +482,7 @@ public abstract class AbstractSimpleTransportTests {
for (int i = 0; i < 10; i++) {
final int counter = i;
// now, try and send another request, this times, with a short timeout
res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
new StringMessageRequest(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
@ -548,4 +560,292 @@ public abstract class AbstractSimpleTransportTests {
out.writeString(message);
}
}
static class Version0Request extends TransportRequest {
int value1;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
value1 = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(value1);
}
}
static class Version1Request extends Version0Request {
int value2;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(version1)) {
value2 = in.readInt();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(version1)) {
out.writeInt(value2);
}
}
}
static class Version0Response extends TransportResponse {
int value1;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
value1 = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(value1);
}
}
static class Version1Response extends Version0Response {
int value2;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(version1)) {
value2 = in.readInt();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(version1)) {
out.writeInt(value2);
}
}
}
@Test
public void testVersion_from0to1() throws Exception {
serviceB.registerHandler("/version", new BaseTransportRequestHandler<Version1Request>() {
@Override
public Version1Request newInstance() {
return new Version1Request();
}
@Override
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(0)); // not set, coming from service A
Version1Response response = new Version1Response();
response.value1 = 1;
response.value2 = 2;
channel.sendResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
Version0Request version0Request = new Version0Request();
version0Request.value1 = 1;
Version0Response version0Response = serviceA.submitRequest(nodeB, "/version", version0Request, new BaseTransportResponseHandler<Version0Response>() {
@Override
public Version0Response newInstance() {
return new Version0Response();
}
@Override
public void handleResponse(Version0Response response) {
assertThat(response.value1, equalTo(1));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assert false;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}).txGet();
assertThat(version0Response.value1, equalTo(1));
}
@Test
public void testVersion_from1to0() throws Exception {
serviceA.registerHandler("/version", new BaseTransportRequestHandler<Version0Request>() {
@Override
public Version0Request newInstance() {
return new Version0Request();
}
@Override
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response();
response.value1 = 1;
channel.sendResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
Version1Request version1Request = new Version1Request();
version1Request.value1 = 1;
version1Request.value2 = 2;
Version1Response version1Response = serviceB.submitRequest(nodeA, "/version", version1Request, new BaseTransportResponseHandler<Version1Response>() {
@Override
public Version1Response newInstance() {
return new Version1Response();
}
@Override
public void handleResponse(Version1Response response) {
assertThat(response.value1, equalTo(1));
assertThat(response.value2, equalTo(0)); // initial values, cause its serialized from version 0
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assert false;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}).txGet();
assertThat(version1Response.value1, equalTo(1));
assertThat(version1Response.value2, equalTo(0));
}
@Test
public void testVersion_from1to1() throws Exception {
serviceB.registerHandler("/version", new BaseTransportRequestHandler<Version1Request>() {
@Override
public Version1Request newInstance() {
return new Version1Request();
}
@Override
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(2));
Version1Response response = new Version1Response();
response.value1 = 1;
response.value2 = 2;
channel.sendResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
Version1Request version1Request = new Version1Request();
version1Request.value1 = 1;
version1Request.value2 = 2;
Version1Response version1Response = serviceB.submitRequest(nodeB, "/version", version1Request, new BaseTransportResponseHandler<Version1Response>() {
@Override
public Version1Response newInstance() {
return new Version1Response();
}
@Override
public void handleResponse(Version1Response response) {
assertThat(response.value1, equalTo(1));
assertThat(response.value2, equalTo(2));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assert false;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}).txGet();
assertThat(version1Response.value1, equalTo(1));
assertThat(version1Response.value2, equalTo(2));
}
@Test
public void testVersion_from0to0() throws Exception {
serviceA.registerHandler("/version", new BaseTransportRequestHandler<Version0Request>() {
@Override
public Version0Request newInstance() {
return new Version0Request();
}
@Override
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response();
response.value1 = 1;
channel.sendResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
Version0Request version0Request = new Version0Request();
version0Request.value1 = 1;
Version0Response version0Response = serviceA.submitRequest(nodeA, "/version", version0Request, new BaseTransportResponseHandler<Version0Response>() {
@Override
public Version0Response newInstance() {
return new Version0Response();
}
@Override
public void handleResponse(Version0Response response) {
assertThat(response.value1, equalTo(1));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assert false;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}).txGet();
assertThat(version0Response.value1, equalTo(1));
}
}

View File

@ -19,20 +19,16 @@
package org.elasticsearch.test.unit.transport.local;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.unit.transport.AbstractSimpleTransportTests;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.Test;
public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
@Override
protected void build() {
serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
protected TransportService build(Settings settings, Version version) {
return new TransportService(new LocalTransport(settings, threadPool, version), threadPool).start();
}
}

View File

@ -19,7 +19,10 @@
package org.elasticsearch.test.unit.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.test.unit.transport.AbstractSimpleTransportTests;
import org.elasticsearch.transport.ConnectTransportException;
@ -27,23 +30,17 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
@Override
protected void build() {
serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
protected TransportService build(Settings settings, Version version) {
return new TransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool).start();
}
@Test
public void testConnectException() {
try {
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876)));
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876), Version.CURRENT));
assert false;
} catch (ConnectTransportException e) {
// e.printStackTrace();