Remove DiscoveryNode#version in favour of existing DiscoveryNode#getVersion

This commit is contained in:
javanna 2016-03-30 14:56:15 +02:00 committed by Luca Cavanna
parent 00f5ca57fa
commit 3942c9e4df
10 changed files with 35 additions and 39 deletions

View File

@ -264,7 +264,7 @@ public class DiscoveryNode implements Writeable<DiscoveryNode>, ToXContent {
return roles;
}
public Version version() {
public Version getVersion() {
return this.version;
}
@ -276,10 +276,6 @@ public class DiscoveryNode implements Writeable<DiscoveryNode>, ToXContent {
return this.hostAddress;
}
public Version getVersion() {
return this.version;
}
@Override
public DiscoveryNode readFrom(StreamInput in) throws IOException {
return new DiscoveryNode(in);

View File

@ -677,16 +677,16 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.version());
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.version());
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
minNodeVersion = Version.smallest(minNodeVersion, nodeEntry.value.version());
minNodeVersion = Version.smallest(minNodeVersion, nodeEntry.value.getVersion());
}
return new DiscoveryNodes(

View File

@ -73,26 +73,26 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target,
RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (target.node().version().onOrAfter(source.node().version())) {
if (target.node().getVersion().onOrAfter(source.node().getVersion())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than source node version [%s]",
target.node().version(), source.node().version());
target.node().getVersion(), source.node().getVersion());
} else {
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the source node version [%s]",
target.node().version(), source.node().version());
target.node().getVersion(), source.node().getVersion());
}
}
private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) {
if (target.node().version().onOrAfter(restoreSource.version())) {
if (target.node().getVersion().onOrAfter(restoreSource.version())) {
/* we can allocate if we can restore from a snapshot that is older or on the same version */
return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than snapshot version [%s]",
target.node().version(), restoreSource.version());
target.node().getVersion(), restoreSource.version());
} else {
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the snapshot version [%s]",
target.node().version(), restoreSource.version());
target.node().getVersion(), restoreSource.version());
}
}
}

View File

@ -212,16 +212,16 @@ public class PublishClusterStateAction extends AbstractComponent {
try {
if (sendFullVersion || !previousState.nodes().nodeExists(node.getId())) {
// will send a full reference
if (serializedStates.containsKey(node.version()) == false) {
serializedStates.put(node.version(), serializeFullClusterState(clusterState, node.version()));
if (serializedStates.containsKey(node.getVersion()) == false) {
serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));
}
} else {
// will send a diff
if (diff == null) {
diff = clusterState.diff(previousState);
}
if (serializedDiffs.containsKey(node.version()) == false) {
serializedDiffs.put(node.version(), serializeDiffClusterState(diff, node.version()));
if (serializedDiffs.containsKey(node.getVersion()) == false) {
serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));
}
}
} catch (IOException e) {
@ -232,11 +232,11 @@ public class PublishClusterStateAction extends AbstractComponent {
private void sendFullClusterState(ClusterState clusterState, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
BytesReference bytes = serializedStates.get(node.version());
BytesReference bytes = serializedStates.get(node.getVersion());
if (bytes == null) {
try {
bytes = serializeFullClusterState(clusterState, node.version());
serializedStates.put(node.version(), bytes);
bytes = serializeFullClusterState(clusterState, node.getVersion());
serializedStates.put(node.getVersion(), bytes);
} catch (Throwable e) {
logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
sendingController.onNodeSendFailed(node, e);
@ -249,8 +249,8 @@ public class PublishClusterStateAction extends AbstractComponent {
private void sendClusterStateDiff(ClusterState clusterState,
Map<Version, BytesReference> serializedDiffs, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
BytesReference bytes = serializedDiffs.get(node.version());
assert bytes != null : "failed to find serialized diff for node " + node + " of version [" + node.version() + "]";
BytesReference bytes = serializedDiffs.get(node.getVersion());
assert bytes != null : "failed to find serialized diff for node " + node + " of version [" + node.getVersion() + "]";
sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, true, serializedStates);
}
@ -266,7 +266,7 @@ public class PublishClusterStateAction extends AbstractComponent {
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.version()),
new BytesTransportRequest(bytes, node.getVersion()),
options,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

View File

@ -202,7 +202,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
final Version version = Version.smallest(node.version(), this.version);
final Version version = Version.smallest(node.getVersion(), this.version);
try (BytesStreamOutput stream = new BytesStreamOutput()) {
stream.setVersion(version);

View File

@ -887,7 +887,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
Version version = Version.smallest(this.version, node.getVersion());
stream.setVersion(version);
threadPool.getThreadContext().writeTo(stream);
@ -900,7 +900,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
assert node.getVersion().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
bytes = bStream.bytes();

View File

@ -367,7 +367,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
Map<String, String> tribeAttr = new HashMap<>(tribe.getAttributes());
tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName);
DiscoveryNode discoNode = new DiscoveryNode(tribe.getName(), tribe.getId(), tribe.getHostName(), tribe.getHostAddress(),
tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(), tribe.version());
tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(), tribe.getVersion());
clusterStateChanged = true;
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.put(discoNode);

View File

@ -139,7 +139,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
for (ObjectCursor<String> node : clusterStateFromDiffs.nodes().nodes().keys()) {
DiscoveryNode node1 = clusterState.nodes().get(node.value);
DiscoveryNode node2 = clusterStateFromDiffs.nodes().get(node.value);
assertThat(node1.version(), equalTo(node2.version()));
assertThat(node1.getVersion(), equalTo(node2.getVersion()));
assertThat(node1.getAddress(), equalTo(node2.getAddress()));
assertThat(node1.getAttributes(), equalTo(node2.getAttributes()));
}

View File

@ -415,17 +415,17 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
String fromId = r.currentNodeId();
assertThat(fromId, notNullValue());
assertThat(toId, notNullValue());
logger.trace("From: {} with Version: {} to: {} with Version: {}", fromId, routingNodes.node(fromId).node().version(),
toId, routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
logger.trace("From: {} with Version: {} to: {} with Version: {}", fromId, routingNodes.node(fromId).node().getVersion(),
toId, routingNodes.node(toId).node().getVersion());
assertTrue(routingNodes.node(toId).node().getVersion().onOrAfter(routingNodes.node(fromId).node().getVersion()));
} else {
ShardRouting primary = routingNodes.activePrimary(r);
assertThat(primary, notNullValue());
String fromId = primary.currentNodeId();
String toId = r.relocatingNodeId();
logger.trace("From: {} with Version: {} to: {} with Version: {}", fromId, routingNodes.node(fromId).node().version(),
toId, routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
logger.trace("From: {} with Version: {} to: {} with Version: {}", fromId, routingNodes.node(fromId).node().getVersion(),
toId, routingNodes.node(toId).node().getVersion());
assertTrue(routingNodes.node(toId).node().getVersion().onOrAfter(routingNodes.node(fromId).node().getVersion()));
}
}
@ -436,9 +436,9 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
assertThat(primary, notNullValue());
String fromId = primary.currentNodeId();
String toId = r.currentNodeId();
logger.trace("From: {} with Version: {} to: {} with Version: {}", fromId, routingNodes.node(fromId).node().version(),
toId, routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
logger.trace("From: {} with Version: {} to: {} with Version: {}", fromId, routingNodes.node(fromId).node().getVersion(),
toId, routingNodes.node(toId).node().getVersion());
assertTrue(routingNodes.node(toId).node().getVersion().onOrAfter(routingNodes.node(fromId).node().getVersion()));
}
}

View File

@ -197,7 +197,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
emptySet(), Version.CURRENT)).masterNodeId("abc");
ClusterState.Builder builder = ClusterState.builder(state);
builder.nodes(nodes);
BytesReference bytes = PublishClusterStateAction.serializeFullClusterState(builder.build(), node.version());
BytesReference bytes = PublishClusterStateAction.serializeFullClusterState(builder.build(), node.getVersion());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();