From 58e9446e00b89270eb099c3cf35e81acbe566d05 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Wed, 13 Jun 2018 17:24:32 +0100 Subject: [PATCH 1/5] Removes experimental tag from scripted_metric aggregation (#31298) --- .../aggregations/metrics/scripted-metric-aggregation.asciidoc | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc b/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc index daa86969e45..1a4d6d4774c 100644 --- a/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc @@ -1,8 +1,6 @@ [[search-aggregations-metrics-scripted-metric-aggregation]] === Scripted Metric Aggregation -experimental[] - A metric aggregation that executes using scripts to provide a metric output. Example: From 73742a4be90b20ffa81755d9ef06cc8d989fba65 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 13 Jun 2018 17:59:43 +0100 Subject: [PATCH 2/5] Add unreleased version 6.3.1 --- server/src/main/java/org/elasticsearch/Version.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/Version.java b/server/src/main/java/org/elasticsearch/Version.java index 5111348d363..848af37c70b 100644 --- a/server/src/main/java/org/elasticsearch/Version.java +++ b/server/src/main/java/org/elasticsearch/Version.java @@ -166,10 +166,10 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_6_2_3 = new Version(V_6_2_3_ID, LUCENE_7_2_1); public static final int V_6_2_4_ID = 6020499; public static final Version V_6_2_4 = new Version(V_6_2_4_ID, LUCENE_7_2_1); - public static final int V_6_2_5_ID = 6020599; - public static final Version V_6_2_5 = new Version(V_6_2_5_ID, LUCENE_7_2_1); public static final int V_6_3_0_ID = 6030099; public static final Version V_6_3_0 = new Version(V_6_3_0_ID, org.apache.lucene.util.Version.LUCENE_7_3_1); + public static final int V_6_3_1_ID = 6030199; + public static final Version V_6_3_1 = new Version(V_6_3_1_ID, org.apache.lucene.util.Version.LUCENE_7_3_1); public static final int V_6_4_0_ID = 6040099; public static final Version V_6_4_0 = new Version(V_6_4_0_ID, org.apache.lucene.util.Version.LUCENE_7_4_0); public static final int V_7_0_0_alpha1_ID = 7000001; @@ -192,10 +192,10 @@ public class Version implements Comparable, ToXContentFragment { return V_7_0_0_alpha1; case V_6_4_0_ID: return V_6_4_0; + case V_6_3_1_ID: + return V_6_3_1; case V_6_3_0_ID: return V_6_3_0; - case V_6_2_5_ID: - return V_6_2_5; case V_6_2_4_ID: return V_6_2_4; case V_6_2_3_ID: From 01d64b128b214c2ac5afc1b0b514dd0d45e60c1b Mon Sep 17 00:00:00 2001 From: Andy Bristol Date: Wed, 13 Jun 2018 11:15:04 -0700 Subject: [PATCH 3/5] [test] opensuse packaging turn up debug logging --- qa/vagrant/src/test/resources/packaging/utils/utils.bash | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/qa/vagrant/src/test/resources/packaging/utils/utils.bash b/qa/vagrant/src/test/resources/packaging/utils/utils.bash index f8c3005de6c..53662ca9d3c 100644 --- a/qa/vagrant/src/test/resources/packaging/utils/utils.bash +++ b/qa/vagrant/src/test/resources/packaging/utils/utils.bash @@ -463,8 +463,9 @@ debug_collect_logs() { set_debug_logging() { if [ "$ESCONFIG" ] && [ -d "$ESCONFIG" ] && [ -f /etc/os-release ] && (grep -qi suse /etc/os-release); then - echo 'logger.org.elasticsearch.indices: DEBUG' >> "$ESCONFIG/elasticsearch.yml" + echo 'logger.org.elasticsearch.indices: TRACE' >> "$ESCONFIG/elasticsearch.yml" echo 'logger.org.elasticsearch.gateway: TRACE' >> "$ESCONFIG/elasticsearch.yml" + echo 'logger.org.elasticsearch.cluster: DEBUG' >> "$ESCONFIG/elasticsearch.yml" fi } From 018d3fc81fed01365bd5ac0bf39764fef75b1e4b Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 13 Jun 2018 14:11:48 -0400 Subject: [PATCH 4/5] Mute ScriptedMetricAggregatorTests testSelfReferencingAggStateAfterMap Tracked by #31307 --- .../metrics/scripted/ScriptedMetricAggregatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java index fdedf1055d6..9417cc092d8 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java @@ -300,6 +300,7 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31307") public void testSelfReferencingAggStateAfterMap() throws IOException { try (Directory directory = newDirectory()) { Integer numDocs = randomInt(100); From 664903a70ad00ec6400f9bfccb620517de355e77 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 13 Jun 2018 20:37:12 +0200 Subject: [PATCH 5/5] CCS: don't proxy requests for already connected node (#31273) Cross-cluster search selects a subset of nodes for each remote cluster and sends requests only to them, which will act as a proxy and properly redirect such requests to the target nodes that hold the relevant data. What happens today is that every time we send a request to a remote cluster, it will be sent to the next node in the proxy list (in round-robin fashion), regardless of whether the target node is already amongst the ones that we are connected to. In case for instance we need to send a shard search request to a data node that's also one of the selected proxy nodes, we may end up sending the request to it through one of the other proxy nodes. This commit optimizes this case to make sure that whenever we are already connected to a remote node, we will send a direct request rather than using the next proxy node. There is a side-effect to this, which is that round-robin will be a bit unbalanced as the data nodes that are also selected as proxies will receive more requests. --- .../action/search/TransportSearchAction.java | 6 +- .../transport/RemoteClusterConnection.java | 88 +++++++++---------- .../RemoteClusterConnectionTests.java | 83 +++++++++++++++-- 3 files changed, 124 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 52c3be87894..ad3b2efd42f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -351,8 +350,7 @@ public class TransportSearchAction extends HandledTransportAction mergeShardsIterators(GroupShardsIterator localShardsIterator, OriginalIndices localIndices, List remoteShardIterators) { - List shards = new ArrayList<>(); - shards.addAll(remoteShardIterators); + List shards = new ArrayList<>(remoteShardIterators); for (ShardIterator shardIterator : localShardsIterator) { shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } @@ -384,7 +382,7 @@ public class TransportSearchAction extends HandledTransportAction listener) { - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, new TransportResponseHandler() { @@ -217,7 +215,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo request.clear(); request.nodes(true); request.local(true); // run this on the node that gets the request it's as good as any other - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override @@ -255,40 +253,52 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } /** - * Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the - * given node. + * Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}. + * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - DiscoveryNode discoveryNode = connectedNodes.get(); + if (transportService.nodeConnected(remoteClusterNode)) { + return transportService.getConnection(remoteClusterNode); + } + DiscoveryNode discoveryNode = connectedNodes.getAny(); Transport.Connection connection = transportService.getConnection(discoveryNode); - return new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return remoteClusterNode; - } + return new ProxyConnection(connection, remoteClusterNode); + } - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + static final class ProxyConnection implements Transport.Connection { + private final Transport.Connection proxyConnection; + private final DiscoveryNode targetNode; + + private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) { + this.proxyConnection = proxyConnection; + this.targetNode = targetNode; + } + + @Override + public DiscoveryNode getNode() { + return targetNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { - connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), - TransportActionProxy.wrapRequest(remoteClusterNode, request), options); - } + proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), + TransportActionProxy.wrapRequest(targetNode, request), options); + } - @Override - public void close() throws IOException { - assert false: "proxy connections must not be closed"; - } + @Override + public void close() { + assert false: "proxy connections must not be closed"; + } - @Override - public Version getVersion() { - return connection.getVersion(); - } - }; + @Override + public Version getVersion() { + return proxyConnection.getVersion(); + } } Transport.Connection getConnection() { - DiscoveryNode discoveryNode = connectedNodes.get(); - return transportService.getConnection(discoveryNode); + return transportService.getConnection(getAnyConnectedNode()); } @Override @@ -385,7 +395,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } @Override - protected void doRun() throws Exception { + protected void doRun() { ActionListener listener = ActionListener.wrap((x) -> { synchronized (queue) { running.release(); @@ -590,8 +600,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return connectedNodes.contains(node); } - DiscoveryNode getConnectedNode() { - return connectedNodes.get(); + DiscoveryNode getAnyConnectedNode() { + return connectedNodes.getAny(); } void addConnectedNode(DiscoveryNode node) { @@ -612,7 +622,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return connectedNodes.size(); } - private static class ConnectedNodes implements Supplier { + private static final class ConnectedNodes { private final Set nodeSet = new HashSet<>(); private final String clusterAlias; @@ -623,8 +633,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo this.clusterAlias = clusterAlias; } - @Override - public synchronized DiscoveryNode get() { + public synchronized DiscoveryNode getAny() { ensureIteratorAvailable(); if (currentIterator.hasNext()) { return currentIterator.next(); @@ -657,15 +666,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return nodeSet.contains(node); } - synchronized Optional getAny() { - ensureIteratorAvailable(); - if (currentIterator.hasNext()) { - return Optional.of(currentIterator.next()); - } else { - return Optional.empty(); - } - } - private synchronized void ensureIteratorAvailable() { if (currentIterator == null) { currentIterator = nodeSet.iterator(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0739ff5633b..ac6f99351e4 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -81,6 +81,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; public class RemoteClusterConnectionTests extends ESTestCase { @@ -992,7 +993,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { barrier.await(); for (int j = 0; j < numGetCalls; j++) { try { - DiscoveryNode node = connection.getConnectedNode(); + DiscoveryNode node = connection.getAnyConnectedNode(); assertNotNull(node); } catch (IllegalStateException e) { if (e.getMessage().startsWith("No node available for cluster:") == false) { @@ -1053,10 +1054,10 @@ public class RemoteClusterConnectionTests extends ESTestCase { try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool, settings); - MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); - MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { + MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); + MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); knownNodes.add(seedTransport.getLocalDiscoNode()); @@ -1093,4 +1094,76 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } } + + public void testGetConnection() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + + DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode(); + assertThat(connectedNode, notNullValue()); + knownNodes.add(connectedNode); + + DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode(); + assertThat(disconnectedNode, notNullValue()); + knownNodes.add(disconnectedNode); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + Transport.Connection seedConnection = new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return connectedNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + // no-op + } + + @Override + public void close() { + // no-op + } + }; + service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) { + @Override + public Connection getConnection(DiscoveryNode node) { + if (node == connectedNode) { + return seedConnection; + } + return super.getConnection(node); + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return node.equals(connectedNode); + } + }); + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + connection.addConnectedNode(connectedNode); + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(connectedNode); + assertSame(seedConnection, remoteConnection); + } + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode()); + assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class))); + assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode())); + } + for (int i = 0; i < 10; i++) { + //always a proxy connection as the target node is not connected + Transport.Connection remoteConnection = connection.getConnection(disconnectedNode); + assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class)); + assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode)); + } + } + } + } + } }