Fix cross-cluster remote node gateway attributes

Remote nodes in cross-cluster search can be marked as eligible for
acting a gateway node via a remote node attribute setting. For example,
if search.remote.node.attr is set to "gateway", only nodes that have
node.attr.gateway set to "true" can be connected to for cross-cluster
search. Unfortunately, there is a bug in the handling of these
attributes due to the use of a dangerous method
Boolean#getBoolean(String) which obtains the system property with
specified name as a boolean. We are not looking at system properties
here, but node settings. This commit fixes this situation, and adds a
test. A follow-up will ban the use of Boolean#getBoolean.

Relates #23863
This commit is contained in:
Jason Tedor 2017-04-01 13:04:51 -04:00 committed by GitHub
parent ee68e75332
commit 38b3fec885
4 changed files with 139 additions and 22 deletions

View File

@ -516,4 +516,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
assert connectHandler.running.availablePermits() == 1;
return true;
}
boolean isNodeConnected(final DiscoveryNode node) {
return connectedNodes.contains(node);
}
}

View File

@ -26,8 +26,10 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
@ -136,7 +138,7 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
// cross cluster search
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
nodePredicate = nodePredicate.and((node) -> Boolean.getBoolean(node.getAttributes().getOrDefault(attribute, "false")));
nodePredicate = nodePredicate.and((node) -> Booleans.isTrue(node.getAttributes().getOrDefault(attribute, "false")));
}
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
@ -185,6 +187,10 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
return remoteClusters.isEmpty() == false;
}
boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode node) {
return remoteClusters.get(remoteCluster).isNodeConnected(node);
}
/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
@ -326,13 +332,20 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
}
void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
updateRemoteClusters(Collections.singletonMap(clusterAlias, addresses.stream().map(address -> {
TransportAddress transportAddress = new TransportAddress(address);
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}).collect(Collectors.toList())),
ActionListener.wrap((x) -> {}, (x) -> {}) );
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}
void updateRemoteCluster(
final String clusterAlias,
final List<InetSocketAddress> addresses,
final ActionListener<Void> connectionListener) {
final List<DiscoveryNode> nodes = addresses.stream().map(address -> {
final TransportAddress transportAddress = new TransportAddress(address);
final String id = clusterAlias + "#" + transportAddress.toString();
final Version version = Version.CURRENT.minimumCompatibilityVersion();
return new DiscoveryNode(id, transportAddress, version);
}).collect(Collectors.toList());
updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener);
}
static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
@ -81,23 +82,33 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
public static MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version, ThreadPool threadPool) {
return startTransport(id, knownNodes, version, threadPool, Settings.EMPTY);
}
public static MockTransportService startTransport(
final String id,
final List<DiscoveryNode> knownNodes,
final Version version,
final ThreadPool threadPool,
final Settings settings) {
boolean success = false;
MockTransportService newService = MockTransportService.createNewService(Settings.EMPTY, version, threadPool, null);
final Settings s = Settings.builder().put(settings).put("node.name", id).build();
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
(request, channel) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(ClusterStateAction.NAME, ClusterStateRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);
}
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L));
});
(request, channel) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);
}
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L));
});
newService.start();
newService.acceptIncomingRequests();
success = true;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class RemoteClusterServiceTests extends ESTestCase {
@ -62,7 +64,16 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version) {
return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool);
return startTransport(id, knownNodes, version, Settings.EMPTY);
}
private MockTransportService startTransport(
final String id,
final List<DiscoveryNode> knownNodes,
final Version version,
final Settings settings) {
return RemoteClusterConnectionTests.startTransport(
id, knownNodes, version, threadPool, settings);
}
public void testSettingsAreRegistered() {
@ -247,4 +258,81 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder());
}
}
public void testRemoteNodeAttribute() throws IOException, InterruptedException {
final Settings settings =
Settings.builder().put("search.remote.node.attr", "gateway").build();
final List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
final Settings gateway = Settings.builder().put("node.attr.gateway", true).build();
try (MockTransportService c1N1 =
startTransport("cluster_1_node_1", knownNodes, Version.CURRENT);
MockTransportService c1N2 =
startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, gateway);
MockTransportService c2N1 =
startTransport("cluster_2_node_1", knownNodes, Version.CURRENT);
MockTransportService c2N2 =
startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, gateway)) {
final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode();
final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode();
final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode();
final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode();
knownNodes.add(c1N1Node);
knownNodes.add(c1N2Node);
knownNodes.add(c2N1Node);
knownNodes.add(c2N2Node);
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService = MockTransportService.createNewService(
settings,
Version.CURRENT,
threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putArray(
"search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putArray(
"search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Address, c1N2Address),
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Address, c2N2Address),
connectionListener(secondLatch));
secondLatch.await();
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
}
}
}
}
private ActionListener<Void> connectionListener(final CountDownLatch latch) {
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
}
}