diff --git a/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc b/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc index daa86969e45..1a4d6d4774c 100644 --- a/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc @@ -1,8 +1,6 @@ [[search-aggregations-metrics-scripted-metric-aggregation]] === Scripted Metric Aggregation -experimental[] - A metric aggregation that executes using scripts to provide a metric output. Example: diff --git a/qa/vagrant/src/test/resources/packaging/utils/utils.bash b/qa/vagrant/src/test/resources/packaging/utils/utils.bash index f8c3005de6c..53662ca9d3c 100644 --- a/qa/vagrant/src/test/resources/packaging/utils/utils.bash +++ b/qa/vagrant/src/test/resources/packaging/utils/utils.bash @@ -463,8 +463,9 @@ debug_collect_logs() { set_debug_logging() { if [ "$ESCONFIG" ] && [ -d "$ESCONFIG" ] && [ -f /etc/os-release ] && (grep -qi suse /etc/os-release); then - echo 'logger.org.elasticsearch.indices: DEBUG' >> "$ESCONFIG/elasticsearch.yml" + echo 'logger.org.elasticsearch.indices: TRACE' >> "$ESCONFIG/elasticsearch.yml" echo 'logger.org.elasticsearch.gateway: TRACE' >> "$ESCONFIG/elasticsearch.yml" + echo 'logger.org.elasticsearch.cluster: DEBUG' >> "$ESCONFIG/elasticsearch.yml" fi } diff --git a/server/src/main/java/org/elasticsearch/Version.java b/server/src/main/java/org/elasticsearch/Version.java index 5111348d363..848af37c70b 100644 --- a/server/src/main/java/org/elasticsearch/Version.java +++ b/server/src/main/java/org/elasticsearch/Version.java @@ -166,10 +166,10 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_6_2_3 = new Version(V_6_2_3_ID, LUCENE_7_2_1); public static final int V_6_2_4_ID = 6020499; public static final Version V_6_2_4 = new Version(V_6_2_4_ID, LUCENE_7_2_1); - public static final int V_6_2_5_ID = 6020599; - public static final Version V_6_2_5 = new Version(V_6_2_5_ID, LUCENE_7_2_1); public static final int V_6_3_0_ID = 6030099; public static final Version V_6_3_0 = new Version(V_6_3_0_ID, org.apache.lucene.util.Version.LUCENE_7_3_1); + public static final int V_6_3_1_ID = 6030199; + public static final Version V_6_3_1 = new Version(V_6_3_1_ID, org.apache.lucene.util.Version.LUCENE_7_3_1); public static final int V_6_4_0_ID = 6040099; public static final Version V_6_4_0 = new Version(V_6_4_0_ID, org.apache.lucene.util.Version.LUCENE_7_4_0); public static final int V_7_0_0_alpha1_ID = 7000001; @@ -192,10 +192,10 @@ public class Version implements Comparable, ToXContentFragment { return V_7_0_0_alpha1; case V_6_4_0_ID: return V_6_4_0; + case V_6_3_1_ID: + return V_6_3_1; case V_6_3_0_ID: return V_6_3_0; - case V_6_2_5_ID: - return V_6_2_5; case V_6_2_4_ID: return V_6_2_4; case V_6_2_3_ID: diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 52c3be87894..ad3b2efd42f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -351,8 +350,7 @@ public class TransportSearchAction extends HandledTransportAction mergeShardsIterators(GroupShardsIterator localShardsIterator, OriginalIndices localIndices, List remoteShardIterators) { - List shards = new ArrayList<>(); - shards.addAll(remoteShardIterators); + List shards = new ArrayList<>(remoteShardIterators); for (ShardIterator shardIterator : localShardsIterator) { shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } @@ -384,7 +382,7 @@ public class TransportSearchAction extends HandledTransportAction listener) { - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, new TransportResponseHandler() { @@ -217,7 +215,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo request.clear(); request.nodes(true); request.local(true); // run this on the node that gets the request it's as good as any other - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override @@ -255,40 +253,52 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } /** - * Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the - * given node. + * Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}. + * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - DiscoveryNode discoveryNode = connectedNodes.get(); + if (transportService.nodeConnected(remoteClusterNode)) { + return transportService.getConnection(remoteClusterNode); + } + DiscoveryNode discoveryNode = connectedNodes.getAny(); Transport.Connection connection = transportService.getConnection(discoveryNode); - return new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return remoteClusterNode; - } + return new ProxyConnection(connection, remoteClusterNode); + } - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + static final class ProxyConnection implements Transport.Connection { + private final Transport.Connection proxyConnection; + private final DiscoveryNode targetNode; + + private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) { + this.proxyConnection = proxyConnection; + this.targetNode = targetNode; + } + + @Override + public DiscoveryNode getNode() { + return targetNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { - connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), - TransportActionProxy.wrapRequest(remoteClusterNode, request), options); - } + proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), + TransportActionProxy.wrapRequest(targetNode, request), options); + } - @Override - public void close() throws IOException { - assert false: "proxy connections must not be closed"; - } + @Override + public void close() { + assert false: "proxy connections must not be closed"; + } - @Override - public Version getVersion() { - return connection.getVersion(); - } - }; + @Override + public Version getVersion() { + return proxyConnection.getVersion(); + } } Transport.Connection getConnection() { - DiscoveryNode discoveryNode = connectedNodes.get(); - return transportService.getConnection(discoveryNode); + return transportService.getConnection(getAnyConnectedNode()); } @Override @@ -385,7 +395,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } @Override - protected void doRun() throws Exception { + protected void doRun() { ActionListener listener = ActionListener.wrap((x) -> { synchronized (queue) { running.release(); @@ -590,8 +600,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return connectedNodes.contains(node); } - DiscoveryNode getConnectedNode() { - return connectedNodes.get(); + DiscoveryNode getAnyConnectedNode() { + return connectedNodes.getAny(); } void addConnectedNode(DiscoveryNode node) { @@ -612,7 +622,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return connectedNodes.size(); } - private static class ConnectedNodes implements Supplier { + private static final class ConnectedNodes { private final Set nodeSet = new HashSet<>(); private final String clusterAlias; @@ -623,8 +633,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo this.clusterAlias = clusterAlias; } - @Override - public synchronized DiscoveryNode get() { + public synchronized DiscoveryNode getAny() { ensureIteratorAvailable(); if (currentIterator.hasNext()) { return currentIterator.next(); @@ -657,15 +666,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return nodeSet.contains(node); } - synchronized Optional getAny() { - ensureIteratorAvailable(); - if (currentIterator.hasNext()) { - return Optional.of(currentIterator.next()); - } else { - return Optional.empty(); - } - } - private synchronized void ensureIteratorAvailable() { if (currentIterator == null) { currentIterator = nodeSet.iterator(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java index fdedf1055d6..9417cc092d8 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java @@ -300,6 +300,7 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31307") public void testSelfReferencingAggStateAfterMap() throws IOException { try (Directory directory = newDirectory()) { Integer numDocs = randomInt(100); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0739ff5633b..ac6f99351e4 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -81,6 +81,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; public class RemoteClusterConnectionTests extends ESTestCase { @@ -992,7 +993,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { barrier.await(); for (int j = 0; j < numGetCalls; j++) { try { - DiscoveryNode node = connection.getConnectedNode(); + DiscoveryNode node = connection.getAnyConnectedNode(); assertNotNull(node); } catch (IllegalStateException e) { if (e.getMessage().startsWith("No node available for cluster:") == false) { @@ -1053,10 +1054,10 @@ public class RemoteClusterConnectionTests extends ESTestCase { try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool, settings); - MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); - MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { + MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); + MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); knownNodes.add(seedTransport.getLocalDiscoNode()); @@ -1093,4 +1094,76 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } } + + public void testGetConnection() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + + DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode(); + assertThat(connectedNode, notNullValue()); + knownNodes.add(connectedNode); + + DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode(); + assertThat(disconnectedNode, notNullValue()); + knownNodes.add(disconnectedNode); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + Transport.Connection seedConnection = new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return connectedNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + // no-op + } + + @Override + public void close() { + // no-op + } + }; + service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) { + @Override + public Connection getConnection(DiscoveryNode node) { + if (node == connectedNode) { + return seedConnection; + } + return super.getConnection(node); + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return node.equals(connectedNode); + } + }); + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + connection.addConnectedNode(connectedNode); + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(connectedNode); + assertSame(seedConnection, remoteConnection); + } + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode()); + assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class))); + assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode())); + } + for (int i = 0; i < 10; i++) { + //always a proxy connection as the target node is not connected + Transport.Connection remoteConnection = connection.getConnection(disconnectedNode); + assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class)); + assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode)); + } + } + } + } + } }