Merge branch 'master' into index-lifecycle

This commit is contained in:
Tal Levy 2018-06-13 12:13:05 -07:00
commit 37c4ddd129
7 changed files with 131 additions and 60 deletions

View File

@ -1,8 +1,6 @@
[[search-aggregations-metrics-scripted-metric-aggregation]]
=== Scripted Metric Aggregation
experimental[]
A metric aggregation that executes using scripts to provide a metric output.
Example:

View File

@ -463,8 +463,9 @@ debug_collect_logs() {
set_debug_logging() {
if [ "$ESCONFIG" ] && [ -d "$ESCONFIG" ] && [ -f /etc/os-release ] && (grep -qi suse /etc/os-release); then
echo 'logger.org.elasticsearch.indices: DEBUG' >> "$ESCONFIG/elasticsearch.yml"
echo 'logger.org.elasticsearch.indices: TRACE' >> "$ESCONFIG/elasticsearch.yml"
echo 'logger.org.elasticsearch.gateway: TRACE' >> "$ESCONFIG/elasticsearch.yml"
echo 'logger.org.elasticsearch.cluster: DEBUG' >> "$ESCONFIG/elasticsearch.yml"
fi
}

View File

@ -166,10 +166,10 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_6_2_3 = new Version(V_6_2_3_ID, LUCENE_7_2_1);
public static final int V_6_2_4_ID = 6020499;
public static final Version V_6_2_4 = new Version(V_6_2_4_ID, LUCENE_7_2_1);
public static final int V_6_2_5_ID = 6020599;
public static final Version V_6_2_5 = new Version(V_6_2_5_ID, LUCENE_7_2_1);
public static final int V_6_3_0_ID = 6030099;
public static final Version V_6_3_0 = new Version(V_6_3_0_ID, org.apache.lucene.util.Version.LUCENE_7_3_1);
public static final int V_6_3_1_ID = 6030199;
public static final Version V_6_3_1 = new Version(V_6_3_1_ID, org.apache.lucene.util.Version.LUCENE_7_3_1);
public static final int V_6_4_0_ID = 6040099;
public static final Version V_6_4_0 = new Version(V_6_4_0_ID, org.apache.lucene.util.Version.LUCENE_7_4_0);
public static final int V_7_0_0_alpha1_ID = 7000001;
@ -192,10 +192,10 @@ public class Version implements Comparable<Version>, ToXContentFragment {
return V_7_0_0_alpha1;
case V_6_4_0_ID:
return V_6_4_0;
case V_6_3_1_ID:
return V_6_3_1;
case V_6_3_0_ID:
return V_6_3_0;
case V_6_2_5_ID:
return V_6_2_5;
case V_6_2_4_ID:
return V_6_2_4;
case V_6_2_3_ID:

View File

@ -50,7 +50,6 @@ import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -351,8 +350,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators) {
List<SearchShardIterator> shards = new ArrayList<>();
shards.addAll(remoteShardIterators);
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
@ -384,7 +382,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() throws IOException {
public void run() {
action.start();
}
};

View File

@ -20,7 +20,6 @@ package org.elasticsearch.transport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -40,6 +39,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
@ -50,7 +50,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -61,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@ -181,7 +179,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = connectedNodes.get();
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@ -217,7 +215,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = connectedNodes.get();
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
@ -255,40 +253,52 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
/**
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
* given node.
* Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}.
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
DiscoveryNode discoveryNode = connectedNodes.get();
if (transportService.nodeConnected(remoteClusterNode)) {
return transportService.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = connectedNodes.getAny();
Transport.Connection connection = transportService.getConnection(discoveryNode);
return new Transport.Connection() {
return new ProxyConnection(connection, remoteClusterNode);
}
static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;
private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
this.proxyConnection = proxyConnection;
this.targetNode = targetNode;
}
@Override
public DiscoveryNode getNode() {
return remoteClusterNode;
return targetNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}
@Override
public void close() throws IOException {
public void close() {
assert false: "proxy connections must not be closed";
}
@Override
public Version getVersion() {
return connection.getVersion();
return proxyConnection.getVersion();
}
};
}
Transport.Connection getConnection() {
DiscoveryNode discoveryNode = connectedNodes.get();
return transportService.getConnection(discoveryNode);
return transportService.getConnection(getAnyConnectedNode());
}
@Override
@ -385,7 +395,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
@Override
protected void doRun() throws Exception {
protected void doRun() {
ActionListener<Void> listener = ActionListener.wrap((x) -> {
synchronized (queue) {
running.release();
@ -590,8 +600,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
return connectedNodes.contains(node);
}
DiscoveryNode getConnectedNode() {
return connectedNodes.get();
DiscoveryNode getAnyConnectedNode() {
return connectedNodes.getAny();
}
void addConnectedNode(DiscoveryNode node) {
@ -612,7 +622,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
return connectedNodes.size();
}
private static class ConnectedNodes implements Supplier<DiscoveryNode> {
private static final class ConnectedNodes {
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
private final String clusterAlias;
@ -623,8 +633,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.clusterAlias = clusterAlias;
}
@Override
public synchronized DiscoveryNode get() {
public synchronized DiscoveryNode getAny() {
ensureIteratorAvailable();
if (currentIterator.hasNext()) {
return currentIterator.next();
@ -657,15 +666,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
return nodeSet.contains(node);
}
synchronized Optional<DiscoveryNode> getAny() {
ensureIteratorAvailable();
if (currentIterator.hasNext()) {
return Optional.of(currentIterator.next());
} else {
return Optional.empty();
}
}
private synchronized void ensureIteratorAvailable() {
if (currentIterator == null) {
currentIterator = nodeSet.iterator();

View File

@ -300,6 +300,7 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31307")
public void testSelfReferencingAggStateAfterMap() throws IOException {
try (Directory directory = newDirectory()) {
Integer numDocs = randomInt(100);

View File

@ -81,6 +81,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;
public class RemoteClusterConnectionTests extends ESTestCase {
@ -992,7 +993,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
barrier.await();
for (int j = 0; j < numGetCalls; j++) {
try {
DiscoveryNode node = connection.getConnectedNode();
DiscoveryNode node = connection.getAnyConnectedNode();
assertNotNull(node);
} catch (IllegalStateException e) {
if (e.getMessage().startsWith("No node available for cluster:") == false) {
@ -1093,4 +1094,76 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
}
public void testGetConnection() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
assertThat(connectedNode, notNullValue());
knownNodes.add(connectedNode);
DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
assertThat(disconnectedNode, notNullValue());
knownNodes.add(disconnectedNode);
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
Transport.Connection seedConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return connectedNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
// no-op
}
@Override
public void close() {
// no-op
}
};
service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
@Override
public Connection getConnection(DiscoveryNode node) {
if (node == connectedNode) {
return seedConnection;
}
return super.getConnection(node);
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(connectedNode);
}
});
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) {
connection.addConnectedNode(connectedNode);
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
assertSame(seedConnection, remoteConnection);
}
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class)));
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
}
for (int i = 0; i < 10; i++) {
//always a proxy connection as the target node is not connected
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));
}
}
}
}
}
}