Merge branch 'master' into feature/http_client

This commit is contained in:
javanna 2016-06-21 15:53:37 +02:00 committed by Luca Cavanna
commit 886cb37efb
595 changed files with 7610 additions and 4141 deletions

View File

@ -196,15 +196,15 @@ In order to play with the distributed nature of Elasticsearch, simply bring more
h3. Where to go from here?
We have just covered a very small portion of what Elasticsearch is all about. For more information, please refer to the "elastic.co":http://www.elastic.co/products/elasticsearch website.
We have just covered a very small portion of what Elasticsearch is all about. For more information, please refer to the "elastic.co":http://www.elastic.co/products/elasticsearch website. General questions can be asked on the "Elastic Discourse forum":https://discuss.elastic.co or on IRC on Freenode at "#elasticsearch":https://webchat.freenode.net/#elasticsearch. The Elasticsearch GitHub repository is reserved for bug reports and feature requests only.
h3. Building from Source
Elasticsearch uses "Gradle":http://gradle.org for its build system. You'll need to have a modern version of Gradle installed - 2.8 should do.
Elasticsearch uses "Gradle":https://gradle.org for its build system. You'll need to have a modern version of Gradle installed - 2.13 should do.
In order to create a distribution, simply run the @gradle build@ command in the cloned directory.
In order to create a distribution, simply run the @gradle assemble@ command in the cloned directory.
The distribution for each project will be created under the @target/releases@ directory in that project.
The distribution for each project will be created under the @build/distributions@ directory in that project.
See the "TESTING":TESTING.asciidoc file for more information about
running the Elasticsearch test suite.

View File

@ -147,7 +147,8 @@ public class AllocationBenchmark {
for (int i = 1; i <= numNodes; i++) {
nb.put(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
}
initialClusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).nodes
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).nodes
(nb).build();
}

View File

@ -84,7 +84,7 @@ dependencies {
compile 'com.netflix.nebula:gradle-info-plugin:3.0.3'
compile 'org.eclipse.jgit:org.eclipse.jgit:3.2.0.201312181205-r'
compile 'com.perforce:p4java:2012.3.551082' // THIS IS SUPPOSED TO BE OPTIONAL IN THE FUTURE....
compile 'de.thetaphi:forbiddenapis:2.1'
compile 'de.thetaphi:forbiddenapis:2.2'
compile 'com.bmuschko:gradle-nexus-plugin:2.3.1'
compile 'org.apache.rat:apache-rat:0.11'
compile 'ru.vyarus:gradle-animalsniffer-plugin:1.0.1'

View File

@ -127,7 +127,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
current.println(setup)
}
body(test)
body(test, false)
}
private void response(Snippet response) {
@ -136,7 +136,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
}
void emitDo(String method, String pathAndQuery,
String body, String catchPart) {
String body, String catchPart, boolean inSetup) {
def (String path, String query) = pathAndQuery.tokenize('?')
current.println(" - do:")
if (catchPart != null) {
@ -160,6 +160,19 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
current.println(" body: |")
body.eachLine { current.println(" $it") }
}
/* Catch any shard failures. These only cause a non-200 response if
* no shard succeeds. But we need to fail the tests on all of these
* because they mean invalid syntax or broken queries or something
* else that we don't want to teach people to do. The REST test
* framework doesn't allow us to has assertions in the setup
* section so we have to skip it there. We also have to skip _cat
* actions because they don't return json so we can't is_false
* them. That is ok because they don't have this
* partial-success-is-success thing.
*/
if (false == inSetup && false == path.startsWith('_cat')) {
current.println(" - is_false: _shards.failures")
}
}
private void setup(Snippet setup) {
@ -169,7 +182,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
setupCurrent(setup)
current.println('---')
current.println("setup:")
body(setup)
body(setup, true)
// always wait for yellow before anything is executed
current.println(
" - do:\n" +
@ -179,7 +192,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
" wait_for_status: \"yellow\"")
}
private void body(Snippet snippet) {
private void body(Snippet snippet, boolean inSetup) {
parse("$snippet", snippet.contents, SYNTAX) { matcher, last ->
if (matcher.group("comment") != null) {
// Comment
@ -193,7 +206,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
// Leading '/'s break the generated paths
pathAndQuery = pathAndQuery.substring(1)
}
emitDo(method, pathAndQuery, body, catchPart)
emitDo(method, pathAndQuery, body, catchPart, inSetup)
}
}

View File

@ -495,11 +495,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]ttl[/\\]IndicesTTLService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]PipelineExecutionService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]PipelineStore.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]CompoundProcessor.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]IngestDocument.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]Pipeline.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]jvm[/\\]GcNames.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]jvm[/\\]HotThreads.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]jvm[/\\]JvmStats.java" checks="LineLength" />
@ -712,7 +707,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]stats[/\\]ClusterStatsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]TransportAnalyzeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]cache[/\\]clear[/\\]ClearIndicesCacheBlocksIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]create[/\\]CreateIndexIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]flush[/\\]SyncedFlushUnitTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]get[/\\]GetIndexIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]shards[/\\]IndicesShardStoreRequestIT.java" checks="LineLength" />
@ -754,7 +748,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterHealthIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterInfoServiceIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateDiffIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]DiskUsageTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]MinimumMasterNodesIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]NoMasterNodeIT.java" checks="LineLength" />
@ -784,13 +777,11 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]AllocationIdTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]DelayedAllocationIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]PrimaryAllocationIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]RoutingBackwardCompatibilityTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]RoutingServiceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]RoutingTableTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]ShardRoutingHelper.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]ShardRoutingTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]UnassignedInfoTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]ActiveAllocationIdTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]AddIncrementallyTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]AllocationCommandsTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]AllocationPriorityTests.java" checks="LineLength" />
@ -862,7 +853,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ZenUnicastDiscoveryIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]NodeJoinControllerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ping[/\\]unicast[/\\]UnicastZenPingIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PublishClusterStateActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
@ -1026,16 +1016,10 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]CloseIndexDisableCloseAllIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]OpenCloseIndexIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]RareClusterStateIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]SimpleIndexStateIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]stats[/\\]IndexStatsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStoreIntegrationIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStoreTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]template[/\\]SimpleIndexTemplateIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]PipelineExecutionServiceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]PipelineStoreTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]CompoundProcessorTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]PipelineFactoryTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]ValueSourceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]mget[/\\]SimpleMgetIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]jvm[/\\]JvmGcMonitorServiceSettingsTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]os[/\\]OsProbeTests.java" checks="LineLength" />
@ -1211,9 +1195,6 @@
<suppress files="plugins[/\\]discovery-ec2[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]aws[/\\]AbstractAwsTestCase.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-ec2[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ec2[/\\]AmazonEC2Mock.java" checks="LineLength" />
<suppress files="plugins[/\\]discovery-gce[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]gce[/\\]GceNetworkTests.java" checks="LineLength" />
<suppress files="plugins[/\\]ingest-geoip[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]geoip[/\\]GeoIpProcessor.java" checks="LineLength" />
<suppress files="plugins[/\\]ingest-geoip[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]geoip[/\\]GeoIpProcessorFactoryTests.java" checks="LineLength" />
<suppress files="plugins[/\\]ingest-geoip[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]geoip[/\\]GeoIpProcessorTests.java" checks="LineLength" />
<suppress files="plugins[/\\]lang-javascript[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]plugin[/\\]javascript[/\\]JavaScriptPlugin.java" checks="LineLength" />
<suppress files="plugins[/\\]lang-javascript[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]script[/\\]javascript[/\\]JavaScriptScriptEngineService.java" checks="LineLength" />
<suppress files="plugins[/\\]lang-javascript[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]script[/\\]javascript[/\\]JavaScriptScriptEngineTests.java" checks="LineLength" />

View File

@ -32,7 +32,7 @@ org.apache.lucene.index.IndexReader#getCombinedCoreAndDeletesKey()
@defaultMessage Soon to be removed
org.apache.lucene.document.FieldType#numericType()
@defaultMessage Don't use MethodHandles in slow ways, dont be lenient in tests.
# unfortunately, invoke() cannot be banned, because forbidden apis does not support signature polymorphic methods
@defaultMessage Don't use MethodHandles in slow ways, don't be lenient in tests.
java.lang.invoke.MethodHandle#invoke(java.lang.Object[])
java.lang.invoke.MethodHandle#invokeWithArguments(java.lang.Object[])
java.lang.invoke.MethodHandle#invokeWithArguments(java.util.List)

View File

@ -22,7 +22,6 @@ package org.elasticsearch;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -329,18 +328,4 @@ public class Version {
public boolean isRC() {
return build > 50 && build < 99;
}
public static class Module extends AbstractModule {
private final Version version;
public Module(Version version) {
this.version = version;
}
@Override
protected void configure() {
bind(Version.class).toInstance(version);
}
}
}

View File

@ -109,6 +109,8 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@ -292,6 +294,7 @@ public class ActionModule extends AbstractModule {
registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
registerAction(ShrinkAction.INSTANCE, TransportShrinkAction.class);
registerAction(RolloverAction.INSTANCE, TransportRolloverAction.class);
registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@ -47,16 +46,14 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportClusterHealthAction extends TransportMasterNodeReadAction<ClusterHealthRequest, ClusterHealthResponse> {
private final ClusterName clusterName;
private final GatewayAllocator gatewayAllocator;
@Inject
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ClusterName clusterName, ActionFilters actionFilters,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) {
super(settings, ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterHealthRequest::new);
this.clusterName = clusterName;
this.gatewayAllocator = gatewayAllocator;
}
@ -284,14 +281,14 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
} catch (IndexNotFoundException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState,
ClusterHealthResponse response = new ClusterHealthResponse(clusterState.getClusterName().value(), Strings.EMPTY_ARRAY, clusterState,
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue);
response.setStatus(ClusterHealthStatus.RED);
return response;
}
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks,
return new ClusterHealthResponse(clusterState.getClusterName().value(), concreteIndices, clusterState, numberOfPendingTasks,
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue);
}
}

View File

@ -47,17 +47,17 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
NodeHotThreads> {
@Inject
public TransportNodesHotThreadsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesHotThreadsAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, NodesHotThreadsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeHotThreads.class);
}
@Override
protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request,
List<NodeHotThreads> responses, List<FailedNodeException> failures) {
return new NodesHotThreadsResponse(clusterName, responses, failures);
return new NodesHotThreadsResponse(clusterService.getClusterName(), responses, failures);
}
@Override

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.ingest.core.IngestInfo;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -48,11 +47,11 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
private final NodeService nodeService;
@Inject
public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesInfoAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, NodesInfoAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT, NodeInfo.class);
this.nodeService = nodeService;
}
@ -60,7 +59,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
@Override
protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest,
List<NodeInfo> responses, List<FailedNodeException> failures) {
return new NodesInfoResponse(clusterName, responses, failures);
return new NodesInfoResponse(clusterService.getClusterName(), responses, failures);
}
@Override

View File

@ -47,7 +47,7 @@ public final class LivenessResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
node = in.readOptionalWriteable(DiscoveryNode::new);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.admin.cluster.node.liveness;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
@ -30,20 +29,17 @@ import org.elasticsearch.transport.TransportService;
public final class TransportLivenessAction implements TransportRequestHandler<LivenessRequest> {
private final ClusterService clusterService;
private final ClusterName clusterName;
public static final String NAME = "cluster:monitor/nodes/liveness";
@Inject
public TransportLivenessAction(ClusterName clusterName,
ClusterService clusterService, TransportService transportService) {
public TransportLivenessAction(ClusterService clusterService, TransportService transportService) {
this.clusterService = clusterService;
this.clusterName = clusterName;
transportService.registerRequestHandler(NAME, LivenessRequest::new, ThreadPool.Names.SAME,
false, false /*can not trip circuit breaker*/, this);
}
@Override
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new LivenessResponse(clusterName, clusterService.localNode()));
channel.sendResponse(new LivenessResponse(clusterService.getClusterName(), clusterService.localNode()));
}
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -48,18 +47,18 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
private final NodeService nodeService;
@Inject
public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesStatsAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, NodesStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT, NodeStats.class);
this.nodeService = nodeService;
}
@Override
protected NodesStatsResponse newResponse(NodesStatsRequest request, List<NodeStats> responses, List<FailedNodeException> failures) {
return new NodesStatsResponse(clusterName, responses, failures);
return new NodesStatsResponse(clusterService.getClusterName(), responses, failures);
}
@Override

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -63,10 +62,10 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban";
@Inject
public TransportCancelTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService,
public TransportCancelTasksAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings, CancelTasksAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, CancelTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -55,9 +54,9 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);
@Inject
public TransportListTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService,
public TransportListTasksAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ListTasksAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, ListTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.admin.cluster.repositories.verify;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -42,15 +41,13 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
private final RepositoriesService repositoriesService;
protected final ClusterName clusterName;
@Inject
public TransportVerifyRepositoryAction(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
public TransportVerifyRepositoryAction(Settings settings, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, VerifyRepositoryRequest::new);
this.repositoriesService = repositoriesService;
this.clusterName = clusterName;
}
@Override
@ -76,7 +73,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
if (verifyResponse.failed()) {
listener.onFailure(new RepositoryVerificationException(request.name(), verifyResponse.failureDescription()));
} else {
listener.onResponse(new VerifyRepositoryResponse(clusterName, verifyResponse.nodes()));
listener.onResponse(new VerifyRepositoryResponse(clusterService.getClusterName(), verifyResponse.nodes()));
}
}

View File

@ -51,7 +51,7 @@ public class VerifyRepositoryResponse extends ActionResponse implements ToXConte
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
nodes = new DiscoveryNode[in.readVInt()];
for (int i=0; i<nodes.length; i++){
nodes[i] = new DiscoveryNode(in);

View File

@ -63,11 +63,11 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
private final SnapshotShardsService snapshotShardsService;
@Inject
public TransportNodesSnapshotsStatus(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesSnapshotsStatus(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
SnapshotShardsService snapshotShardsService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeSnapshotStatus.class);
this.snapshotShardsService = snapshotShardsService;
}
@ -89,7 +89,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
@Override
protected NodesSnapshotStatus newResponse(Request request, List<NodeSnapshotStatus> responses, List<FailedNodeException> failures) {
return new NodesSnapshotStatus(clusterName, responses, failures);
return new NodesSnapshotStatus(clusterService.getClusterName(), responses, failures);
}
@Override
@ -158,9 +158,6 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public static class NodesSnapshotStatus extends BaseNodesResponse<NodeSnapshotStatus> {
NodesSnapshotStatus() {
}
public NodesSnapshotStatus(ClusterName clusterName, List<NodeSnapshotStatus> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

View File

@ -54,7 +54,7 @@ public class ClusterStateResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
clusterState = ClusterState.Builder.readFrom(in, null);
}

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -42,13 +41,11 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportClusterStateAction extends TransportMasterNodeReadAction<ClusterStateRequest, ClusterStateResponse> {
private final ClusterName clusterName;
@Inject
public TransportClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ClusterName clusterName, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClusterStateAction.NAME, false, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterStateRequest::new);
this.clusterName = clusterName;
}
@Override
@ -127,7 +124,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
if (request.customs()) {
builder.customs(currentState.customs());
}
listener.onResponse(new ClusterStateResponse(clusterName, builder.build()));
listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build()));
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -62,11 +61,11 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Inject
public TransportClusterStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportClusterStatsAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClusterStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, ClusterStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT,
ClusterStatsNodeResponse.class);
this.nodeService = nodeService;
@ -76,8 +75,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Override
protected ClusterStatsResponse newResponse(ClusterStatsRequest request,
List<ClusterStatsNodeResponse> responses, List<FailedNodeException> failures) {
return new ClusterStatsResponse(System.currentTimeMillis(), clusterName, clusterService.state().metaData().clusterUUID(),
responses, failures);
return new ClusterStatsResponse(System.currentTimeMillis(), clusterService.getClusterName(),
clusterService.state().metaData().clusterUUID(), responses, failures);
}
@Override

View File

@ -28,7 +28,7 @@ public class IndicesAliasesClusterStateUpdateRequest extends ClusterStateUpdateR
AliasAction[] actions;
IndicesAliasesClusterStateUpdateRequest() {
public IndicesAliasesClusterStateUpdateRequest() {
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import java.util.Set;
/**
* Base class for rollover request conditions
*/
public abstract class Condition<T> implements NamedWriteable {
public static ObjectParser<Set<Condition>, ParseFieldMatcherSupplier> PARSER =
new ObjectParser<>("conditions", null);
static {
PARSER.declareString((conditions, s) ->
conditions.add(new MaxAgeCondition(TimeValue.parseTimeValue(s, MaxAgeCondition.NAME))),
new ParseField(MaxAgeCondition.NAME));
PARSER.declareLong((conditions, value) ->
conditions.add(new MaxDocsCondition(value)), new ParseField(MaxDocsCondition.NAME));
}
protected T value;
protected final String name;
protected Condition(String name) {
this.name = name;
}
public abstract Result evaluate(final Stats stats);
@Override
public final String toString() {
return "[" + name + ": " + value + "]";
}
/**
* Holder for index stats used to evaluate conditions
*/
public static class Stats {
public final long numDocs;
public final long indexCreated;
public Stats(long numDocs, long indexCreated) {
this.numDocs = numDocs;
this.indexCreated = indexCreated;
}
}
/**
* Holder for evaluated condition result
*/
public static class Result {
public final Condition condition;
public final boolean matched;
protected Result(Condition condition, boolean matched) {
this.condition = condition;
this.matched = matched;
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
/**
* Condition for index maximum age. Evaluates to <code>true</code>
* when the index is at least {@link #value} old
*/
public class MaxAgeCondition extends Condition<TimeValue> {
public final static String NAME = "max_age";
public MaxAgeCondition(TimeValue value) {
super(NAME);
this.value = value;
}
public MaxAgeCondition(StreamInput in) throws IOException {
super(NAME);
this.value = TimeValue.timeValueMillis(in.readLong());
}
@Override
public Result evaluate(final Stats stats) {
long indexAge = System.currentTimeMillis() - stats.indexCreated;
return new Result(this, this.value.getMillis() <= indexAge);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value.getMillis());
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Condition for maximum index docs. Evaluates to <code>true</code>
* when the index has at least {@link #value} docs
*/
public class MaxDocsCondition extends Condition<Long> {
public final static String NAME = "max_docs";
public MaxDocsCondition(Long value) {
super(NAME);
this.value = value;
}
public MaxDocsCondition(StreamInput in) throws IOException {
super(NAME);
this.value = in.readLong();
}
@Override
public Result evaluate(final Stats stats) {
return new Result(this, this.value <= stats.numDocs);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(value);
}
}

View File

@ -17,32 +17,29 @@
* under the License.
*/
package org.elasticsearch.env;
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
*
*/
public class NodeEnvironmentModule extends AbstractModule {
public class RolloverAction extends Action<RolloverRequest, RolloverResponse, RolloverRequestBuilder> {
private final NodeEnvironment nodeEnvironment;
public static final RolloverAction INSTANCE = new RolloverAction();
public static final String NAME = "indices:admin/rollover";
public NodeEnvironmentModule() {
this(null);
}
public NodeEnvironmentModule(@Nullable NodeEnvironment nodeEnvironment) {
this.nodeEnvironment = nodeEnvironment;
private RolloverAction() {
super(NAME);
}
@Override
protected void configure() {
if (nodeEnvironment != null) {
bind(NodeEnvironment.class).toInstance(nodeEnvironment);
} else {
bind(NodeEnvironment.class).asEagerSingleton();
}
public RolloverResponse newResponse() {
return new RolloverResponse();
}
}
@Override
public RolloverRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RolloverRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Request class to swap index under an alias upon satisfying conditions
*/
public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implements IndicesRequest {
public static ObjectParser<RolloverRequest, ParseFieldMatcherSupplier> PARSER =
new ObjectParser<>("conditions", null);
static {
PARSER.declareField((parser, request, parseFieldMatcherSupplier) ->
Condition.PARSER.parse(parser, request.conditions, parseFieldMatcherSupplier),
new ParseField("conditions"), ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, request, parseFieldMatcherSupplier) ->
request.createIndexRequest.settings(parser.map()),
new ParseField("settings"), ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, request, parseFieldMatcherSupplier) -> {
for (Map.Entry<String, Object> mappingsEntry : parser.map().entrySet()) {
request.createIndexRequest.mapping(mappingsEntry.getKey(),
(Map<String, Object>) mappingsEntry.getValue());
}
}, new ParseField("mappings"), ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, request, parseFieldMatcherSupplier) ->
request.createIndexRequest.aliases(parser.map()),
new ParseField("aliases"), ObjectParser.ValueType.OBJECT);
}
private String alias;
private String newIndexName;
private boolean dryRun;
private Set<Condition> conditions = new HashSet<>(2);
private CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
RolloverRequest() {}
public RolloverRequest(String alias, String newIndexName) {
this.alias = alias;
this.newIndexName = newIndexName;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = createIndexRequest == null ? null : createIndexRequest.validate();
if (alias == null) {
validationException = addValidationError("index alias is missing", validationException);
}
if (createIndexRequest == null) {
validationException = addValidationError("create index request is missing", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
alias = in.readString();
newIndexName = in.readOptionalString();
dryRun = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
this.conditions.add(in.readNamedWriteable(Condition.class));
}
createIndexRequest = new CreateIndexRequest();
createIndexRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(alias);
out.writeOptionalString(newIndexName);
out.writeBoolean(dryRun);
out.writeVInt(conditions.size());
for (Condition condition : conditions) {
out.writeNamedWriteable(condition);
}
createIndexRequest.writeTo(out);
}
@Override
public String[] indices() {
return new String[] {alias};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
/**
* Sets the alias to rollover to another index
*/
public void setAlias(String alias) {
this.alias = alias;
}
/**
* Sets the alias to rollover to another index
*/
public void setNewIndexName(String newIndexName) {
this.newIndexName = newIndexName;
}
/**
* Sets if the rollover should not be executed when conditions are met
*/
public void dryRun(boolean dryRun) {
this.dryRun = dryRun;
}
/**
* Adds condition to check if the index is at least <code>age</code> old
*/
public void addMaxIndexAgeCondition(TimeValue age) {
this.conditions.add(new MaxAgeCondition(age));
}
/**
* Adds condition to check if the index has at least <code>numDocs</code>
*/
public void addMaxIndexDocsCondition(long numDocs) {
this.conditions.add(new MaxDocsCondition(numDocs));
}
/**
* Sets rollover index creation request to override index settings when
* the rolled over index has to be created
*/
public void setCreateIndexRequest(CreateIndexRequest createIndexRequest) {
this.createIndexRequest = Objects.requireNonNull(createIndexRequest, "create index request must not be null");;
}
boolean isDryRun() {
return dryRun;
}
Set<Condition> getConditions() {
return conditions;
}
String getAlias() {
return alias;
}
String getNewIndexName() {
return newIndexName;
}
CreateIndexRequest getCreateIndexRequest() {
return createIndexRequest;
}
public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
PARSER.parse(parser, this, () -> ParseFieldMatcher.EMPTY);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse source for rollover index", e);
}
} else {
throw new ElasticsearchParseException("failed to parse content type for rollover index source");
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<RolloverRequest, RolloverResponse,
RolloverRequestBuilder> {
public RolloverRequestBuilder(ElasticsearchClient client, RolloverAction action) {
super(client, action, new RolloverRequest());
}
public RolloverRequestBuilder setAlias(String alias) {
this.request.setAlias(alias);
return this;
}
public RolloverRequestBuilder setNewIndexName(String newIndexName) {
this.request.setNewIndexName(newIndexName);
return this;
}
public RolloverRequestBuilder addMaxIndexAgeCondition(TimeValue age) {
this.request.addMaxIndexAgeCondition(age);
return this;
}
public RolloverRequestBuilder addMaxIndexDocsCondition(long docs) {
this.request.addMaxIndexDocsCondition(docs);
return this;
}
public RolloverRequestBuilder dryRun(boolean dryRun) {
this.request.dryRun(dryRun);
return this;
}
public RolloverRequestBuilder settings(Settings settings) {
this.request.getCreateIndexRequest().settings(settings);
return this;
}
public RolloverRequestBuilder alias(Alias alias) {
this.request.getCreateIndexRequest().alias(alias);
return this;
}
public RolloverRequestBuilder mapping(String type, String source) {
this.request.getCreateIndexRequest().mapping(type, source);
return this;
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public final class RolloverResponse extends ActionResponse implements ToXContent {
private static final String NEW_INDEX = "new_index";
private static final String OLD_INDEX = "old_index";
private static final String DRY_RUN = "dry_run";
private static final String ROLLED_OVER = "rolled_over";
private static final String CONDITIONS = "conditions";
private String oldIndex;
private String newIndex;
private Set<Map.Entry<String, Boolean>> conditionStatus;
private boolean dryRun;
private boolean rolledOver;
RolloverResponse() {
}
RolloverResponse(String oldIndex, String newIndex, Set<Condition.Result> conditionResults,
boolean dryRun, boolean rolledOver) {
this.oldIndex = oldIndex;
this.newIndex = newIndex;
this.dryRun = dryRun;
this.rolledOver = rolledOver;
this.conditionStatus = conditionResults.stream()
.map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched))
.collect(Collectors.toSet());
}
/**
* Returns the name of the index that the request alias was pointing to
*/
public String getOldIndex() {
return oldIndex;
}
/**
* Returns the name of the index that the request alias currently points to
*/
public String getNewIndex() {
return newIndex;
}
/**
* Returns the statuses of all the request conditions
*/
public Set<Map.Entry<String, Boolean>> getConditionStatus() {
return conditionStatus;
}
/**
* Returns if the rollover execution was skipped even when conditions were met
*/
public boolean isDryRun() {
return dryRun;
}
/**
* Returns if the rollover was not simulated and the conditions were met
*/
public boolean isRolledOver() {
return rolledOver;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
oldIndex = in.readString();
newIndex = in.readString();
int conditionSize = in.readVInt();
Set<Map.Entry<String, Boolean>> conditions = new HashSet<>(conditionSize);
for (int i = 0; i < conditionSize; i++) {
String condition = in.readString();
boolean satisfied = in.readBoolean();
conditions.add(new AbstractMap.SimpleEntry<>(condition, satisfied));
}
conditionStatus = conditions;
dryRun = in.readBoolean();
rolledOver = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(oldIndex);
out.writeString(newIndex);
out.writeVInt(conditionStatus.size());
for (Map.Entry<String, Boolean> entry : conditionStatus) {
out.writeString(entry.getKey());
out.writeBoolean(entry.getValue());
}
out.writeBoolean(dryRun);
out.writeBoolean(rolledOver);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(OLD_INDEX, oldIndex);
builder.field(NEW_INDEX, newIndex);
builder.field(ROLLED_OVER, rolledOver);
builder.field(DRY_RUN, dryRun);
builder.startObject(CONDITIONS);
for (Map.Entry<String, Boolean> entry : conditionStatus) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,223 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Main class to swap the index pointed to by an alias, given some conditions
*/
public class TransportRolloverAction extends TransportMasterNodeAction<RolloverRequest, RolloverResponse> {
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-(\\d)+$");
private final MetaDataCreateIndexService createIndexService;
private final MetaDataIndexAliasesService indexAliasesService;
private final Client client;
@Inject
public TransportRolloverAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetaDataIndexAliasesService indexAliasesService, Client client) {
super(settings, RolloverAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
RolloverRequest::new);
this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService;
this.client = client;
}
@Override
protected String executor() {
// we go async right away
return ThreadPool.Names.SAME;
}
@Override
protected RolloverResponse newResponse() {
return new RolloverResponse();
}
@Override
protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState state) {
IndicesOptions indicesOptions = IndicesOptions.fromOptions(true, true,
request.indicesOptions().expandWildcardsOpen(), request.indicesOptions().expandWildcardsClosed());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request.indices()));
}
@Override
protected void masterOperation(final RolloverRequest rolloverRequest, final ClusterState state,
final ActionListener<RolloverResponse> listener) {
final MetaData metaData = state.metaData();
validate(metaData, rolloverRequest);
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(rolloverRequest.getAlias());
final IndexMetaData indexMetaData = aliasOrIndex.getIndices().get(0);
final String sourceIndexName = indexMetaData.getIndex().getName();
client.admin().indices().prepareStats(sourceIndexName).clear().setDocs(true).execute(
new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse statsResponse) {
final Set<Condition.Result> conditionResults = evaluateConditions(rolloverRequest.getConditions(),
statsResponse.getTotal().getDocs(), metaData.index(sourceIndexName));
final String rolloverIndexName = (rolloverRequest.getNewIndexName() != null)
? rolloverRequest.getNewIndexName()
: generateRolloverIndexName(sourceIndexName);
if (rolloverRequest.isDryRun()) {
listener.onResponse(
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false));
return;
}
if (conditionResults.size() == 0 || conditionResults.stream().anyMatch(result -> result.matched)) {
createIndexService.createIndex(prepareCreateIndexRequest(rolloverIndexName, rolloverRequest),
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
// switch the alias to point to the newly created index
indexAliasesService.indicesAliases(
prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName,
rolloverRequest),
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
listener.onResponse(
new RolloverResponse(sourceIndexName, rolloverIndexName,
conditionResults, false, true));
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
} else {
// conditions not met
listener.onResponse(
new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false)
);
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
}
);
}
static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest()
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
AliasAction[] actions = new AliasAction[2];
actions[0] = new AliasAction(AliasAction.Type.ADD, newIndex, request.getAlias());
actions[1] = new AliasAction(AliasAction.Type.REMOVE, oldIndex, request.getAlias());
updateRequest.actions(actions);
return updateRequest;
}
static String generateRolloverIndexName(String sourceIndexName) {
if (INDEX_NAME_PATTERN.matcher(sourceIndexName).matches()) {
int numberIndex = sourceIndexName.lastIndexOf("-");
assert numberIndex != -1 : "no separator '-' found";
int counter = Integer.parseInt(sourceIndexName.substring(numberIndex + 1));
return String.join("-", sourceIndexName.substring(0, numberIndex), String.valueOf(++counter));
} else {
throw new IllegalArgumentException("index name [" + sourceIndexName + "] does not match pattern '^.*-(\\d)+$'");
}
}
static Set<Condition.Result> evaluateConditions(final Set<Condition> conditions,
final DocsStats docsStats, final IndexMetaData metaData) {
final long numDocs = docsStats == null ? 0 : docsStats.getCount();
final Condition.Stats stats = new Condition.Stats(numDocs, metaData.getCreationDate());
return conditions.stream()
.map(condition -> condition.evaluate(stats))
.collect(Collectors.toSet());
}
static void validate(MetaData metaData, RolloverRequest request) {
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(request.getAlias());
if (aliasOrIndex == null) {
throw new IllegalArgumentException("source alias does not exist");
}
if (aliasOrIndex.isAlias() == false) {
throw new IllegalArgumentException("source alias is a concrete index");
}
if (aliasOrIndex.getIndices().size() != 1) {
throw new IllegalArgumentException("source alias maps to multiple indices");
}
}
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final String targetIndexName,
final RolloverRequest rolloverRequest) {
final CreateIndexRequest createIndexRequest = rolloverRequest.getCreateIndexRequest();
createIndexRequest.cause("rollover_index");
createIndexRequest.index(targetIndexName);
return new CreateIndexClusterStateUpdateRequest(createIndexRequest,
"rollover_index", targetIndexName, true)
.ackTimeout(createIndexRequest.timeout())
.masterNodeTimeout(createIndexRequest.masterNodeTimeout())
.settings(createIndexRequest.settings())
.aliases(createIndexRequest.aliases())
.mappings(createIndexRequest.mappings());
}
}

View File

@ -35,7 +35,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.IngestInfo;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -22,7 +22,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;

View File

@ -21,9 +21,9 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;

View File

@ -24,9 +24,9 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineStore;
import java.io.IOException;
@ -35,7 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineRequest> {

View File

@ -24,8 +24,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;

View File

@ -19,9 +19,9 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.List;

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;
import java.util.Map;

View File

@ -84,7 +84,7 @@ public class MainResponse extends ActionResponse implements ToXContent {
super.readFrom(in);
nodeName = in.readString();
version = Version.readVersion(in);
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
build = Build.readBuild(in);
available = in.readBoolean();
}

View File

@ -37,15 +37,13 @@ import org.elasticsearch.transport.TransportService;
public class TransportMainAction extends HandledTransportAction<MainRequest, MainResponse> {
private final ClusterService clusterService;
private final Version version;
@Inject
public TransportMainAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, Version version) {
ClusterService clusterService) {
super(settings, MainAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MainRequest::new);
this.clusterService = clusterService;
this.version = version;
}
@Override
@ -54,6 +52,7 @@ public class TransportMainAction extends HandledTransportAction<MainRequest, Mai
assert Node.NODE_NAME_SETTING.exists(settings);
final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
listener.onResponse(
new MainResponse(Node.NODE_NAME_SETTING.get(settings), version, clusterState.getClusterName(), Build.CURRENT, available));
new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(), Build.CURRENT,
available));
}
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.Template;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.highlight.HighlightBuilder;
@ -387,7 +387,7 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
/**
* Adds an aggregation to the search operation.
*/
public SearchRequestBuilder addAggregation(PipelineAggregatorBuilder aggregation) {
public SearchRequestBuilder addAggregation(PipelineAggregationBuilder aggregation) {
sourceBuilder().aggregation(aggregation);
return this;
}

View File

@ -106,7 +106,7 @@ public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse>
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
nodes = readNodesFrom(in);
failures = in.readList(FailedNodeException::new);
}

View File

@ -24,13 +24,11 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -59,21 +57,19 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
NodeResponse extends BaseNodeResponse>
extends HandledTransportAction<NodesRequest, NodesResponse> {
protected final ClusterName clusterName;
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Class<NodeResponse> nodeResponseClass;
final String transportNodeAction;
protected TransportNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
protected TransportNodesAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<NodesRequest> request, Supplier<NodeRequest> nodeRequest,
String nodeExecutor,
Class<NodeResponse> nodeResponseClass) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterName = Objects.requireNonNull(clusterName);
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);

View File

@ -78,11 +78,11 @@ public class ReplicationOperation<
private final List<ReplicationResponse.ShardInfo.Failure> shardReplicaFailures = Collections.synchronizedList(new ArrayList<>());
ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
boolean executeOnReplicas, boolean checkWriteConsistency,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
boolean executeOnReplicas, boolean checkWriteConsistency,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
this.checkWriteConsistency = checkWriteConsistency;
this.executeOnReplicas = executeOnReplicas;
this.replicasProxy = replicas;
@ -94,7 +94,7 @@ public class ReplicationOperation<
this.opType = opType;
}
void execute() throws Exception {
public void execute() throws Exception {
final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
@ -294,7 +294,7 @@ public class ReplicationOperation<
}
interface Primary<
public interface Primary<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
PrimaryResultT extends PrimaryResult<ReplicaRequest>
@ -322,7 +322,7 @@ public class ReplicationOperation<
}
interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
public interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
/**
* performs the the given request on the specified replica
@ -366,7 +366,7 @@ public class ReplicationOperation<
}
}
interface PrimaryResult<R extends ReplicationRequest<R>> {
public interface PrimaryResult<R extends ReplicationRequest<R>> {
R replicaRequest();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -67,7 +66,6 @@ public abstract class TransportTasksAction<
TaskResponse extends Writeable
> extends HandledTransportAction<TasksRequest, TasksResponse> {
protected final ClusterName clusterName;
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Supplier<TasksRequest> requestSupplier;
@ -75,13 +73,12 @@ public abstract class TransportTasksAction<
protected final String transportNodeAction;
protected TransportTasksAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
protected TransportTasksAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier,
String nodeExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestSupplier);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.transportNodeAction = actionName + "[n]";

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -42,7 +44,15 @@ public abstract class FilterClient extends AbstractClient {
* @see #in()
*/
public FilterClient(Client in) {
super(in.settings(), in.threadPool());
this(in.settings(), in.threadPool(), in);
}
/**
* A Constructor that allows to pass settings and threadpool separately. This is useful if the
* client is a proxy and not yet fully constructed ie. both dependencies are not available yet.
*/
protected FilterClient(Settings settings, ThreadPool threadPool, Client in) {
super(settings, threadPool);
this.in = in;
}

View File

@ -80,6 +80,9 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
@ -800,4 +803,19 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
void shrinkIndex(ShrinkRequest request, ActionListener<ShrinkResponse> listener);
/**
* Swaps the index pointed to by an alias given all provided conditions are satisfied
*/
RolloverRequestBuilder prepareRolloverIndex(String sourceAlias);
/**
* Swaps the index pointed to by an alias given all provided conditions are satisfied
*/
ActionFuture<RolloverResponse> rolloversIndex(RolloverRequest request);
/**
* Swaps the index pointed to by an alias given all provided conditions are satisfied
*/
void rolloverIndex(RolloverRequest request, ActionListener<RolloverResponse> listener);
}

View File

@ -212,6 +212,10 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
@ -1728,6 +1732,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
execute(ShrinkAction.INSTANCE, request, listener);
}
@Override
public RolloverRequestBuilder prepareRolloverIndex(String alias) {
return new RolloverRequestBuilder(this, RolloverAction.INSTANCE).setAlias(alias);
}
@Override
public ActionFuture<RolloverResponse> rolloversIndex(RolloverRequest request) {
return execute(RolloverAction.INSTANCE, request);
}
@Override
public void rolloverIndex(RolloverRequest request, ActionListener<RolloverResponse> listener) {
execute(RolloverAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<GetSettingsResponse> getSettings(GetSettingsRequest request) {
return execute(GetSettingsAction.INSTANCE, request);

View File

@ -19,7 +19,7 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
@ -28,7 +28,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
@ -43,11 +42,9 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
@ -55,6 +52,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -121,23 +119,18 @@ public class TransportClient extends AbstractClient {
public TransportClient build() {
final PluginsService pluginsService = newPluginService(providedSettings);
final Settings settings = pluginsService.updatedSettings();
Version version = Version.CURRENT;
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final NetworkService networkService = new NetworkService(settings);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
modules.add(new PluginsModule(pluginsService));
modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry));
modules.add(new ClusterNameModule(settings));
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
modules.add(new SearchModule(settings, namedWriteableRegistry) {
@Override
@ -158,21 +151,25 @@ public class TransportClient extends AbstractClient {
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
modules.add((b -> b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService)));
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
}));
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
transportService.acceptIncomingRequests();
TransportClient transportClient = new TransportClient(injector);
success = true;
resourcesToClose.clear();
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
}
@ -267,24 +264,16 @@ public class TransportClient extends AbstractClient {
*/
@Override
public void close() {
injector.getInstance(TransportClientNodesService.class).close();
injector.getInstance(TransportService.class).close();
try {
injector.getInstance(MonitorService.class).close();
} catch (Exception e) {
// ignore, might not be bounded
}
List<Closeable> closeables = new ArrayList<>();
closeables.add(injector.getInstance(TransportClientNodesService.class));
closeables.add(injector.getInstance(TransportService.class));
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).nodeServices()) {
injector.getInstance(plugin).close();
closeables.add(injector.getInstance(plugin));
}
try {
ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS);
} catch (Exception e) {
// ignore
}
injector.getInstance(BigArrays.class).close();
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
closeables.add(injector.getInstance(BigArrays.class));
IOUtils.closeWhileHandlingException(closeables);
}
@Override

View File

@ -48,6 +48,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -66,7 +67,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
*
*/
public class TransportClientNodesService extends AbstractComponent {
public class TransportClientNodesService extends AbstractComponent implements Closeable {
private final TimeValue nodesSamplerInterval;
@ -111,13 +112,13 @@ public class TransportClientNodesService extends AbstractComponent {
Setting.boolSetting("client.transport.sniff", false, Property.NodeScope);
@Inject
public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService,
ThreadPool threadPool, Version version) {
public TransportClientNodesService(Settings settings,TransportService transportService,
ThreadPool threadPool) {
super(settings);
this.clusterName = clusterName;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.threadPool = threadPool;
this.minCompatibilityVersion = version.minimumCompatibilityVersion();
this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
this.nodesSamplerInterval = CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
this.pingTimeout = CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();

View File

@ -100,17 +100,19 @@ public class ClusterModule extends AbstractModule {
private final ExtensionPoint.SelectedType<ShardsAllocator> shardsAllocators = new ExtensionPoint.SelectedType<>("shards_allocator", ShardsAllocator.class);
private final ExtensionPoint.ClassSet<AllocationDecider> allocationDeciders = new ExtensionPoint.ClassSet<>("allocation_decider", AllocationDecider.class, AllocationDeciders.class);
private final ExtensionPoint.ClassSet<IndexTemplateFilter> indexTemplateFilters = new ExtensionPoint.ClassSet<>("index_template_filter", IndexTemplateFilter.class);
private final ClusterService clusterService;
// pkg private so tests can mock
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;
public ClusterModule(Settings settings) {
public ClusterModule(Settings settings, ClusterService clusterService) {
this.settings = settings;
for (Class<? extends AllocationDecider> decider : ClusterModule.DEFAULT_ALLOCATION_DECIDERS) {
registerAllocationDecider(decider);
}
registerShardsAllocator(ClusterModule.BALANCED_ALLOCATOR, BalancedShardsAllocator.class);
registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
this.clusterService = clusterService;
}
public void registerAllocationDecider(Class<? extends AllocationDecider> allocationDecider) {
@ -140,9 +142,8 @@ public class ClusterModule extends AbstractModule {
bind(GatewayAllocator.class).asEagerSingleton();
bind(AllocationService.class).asEagerSingleton();
bind(DiscoveryNodeService.class).asEagerSingleton();
bind(ClusterService.class).asEagerSingleton();
bind(ClusterService.class).toInstance(clusterService);
bind(NodeConnectionsService.class).asEagerSingleton();
bind(OperationRouting.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();
bind(MetaDataDeleteIndexService.class).asEagerSingleton();
bind(MetaDataIndexStateService.class).asEagerSingleton();

View File

@ -21,37 +21,29 @@ package org.elasticsearch.cluster;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.util.Objects;
/**
*
*/
public class ClusterName implements Streamable {
public class ClusterName implements Writeable {
public static final Setting<String> CLUSTER_NAME_SETTING = new Setting<>("cluster.name", "elasticsearch", (s) -> {
public static final Setting<ClusterName> CLUSTER_NAME_SETTING = new Setting<>("cluster.name", "elasticsearch", (s) -> {
if (s.isEmpty()) {
throw new IllegalArgumentException("[cluster.name] must not be empty");
}
return s;
}, Property.NodeScope);
return new ClusterName(s);
}, Setting.Property.NodeScope);
public static final ClusterName DEFAULT = CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
public static final ClusterName DEFAULT = new ClusterName(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY).intern());
private final String value;
private String value;
public static ClusterName clusterNameFromSettings(Settings settings) {
return new ClusterName(CLUSTER_NAME_SETTING.get(settings));
public ClusterName(StreamInput input) throws IOException {
this(input.readString());
}
private ClusterName() {
}
public ClusterName(String value) {
this.value = value.intern();
}
@ -60,17 +52,6 @@ public class ClusterName implements Streamable {
return this.value;
}
public static ClusterName readClusterName(StreamInput in) throws IOException {
ClusterName clusterName = new ClusterName();
clusterName.readFrom(in);
return clusterName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readString().intern();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);
@ -90,7 +71,7 @@ public class ClusterName implements Streamable {
@Override
public int hashCode() {
return value != null ? value.hashCode() : 0;
return Objects.hash(value);
}
@Override

View File

@ -90,7 +90,7 @@ import java.util.Set;
*/
public class ClusterState implements ToXContent, Diffable<ClusterState> {
public static final ClusterState PROTO = builder(ClusterName.DEFAULT).build();
public static final ClusterState PROTO = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
public static enum ClusterStateStatus {
UNKNOWN((byte) 0),
@ -734,7 +734,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
public ClusterState readFrom(StreamInput in, DiscoveryNode localNode) throws IOException {
ClusterName clusterName = ClusterName.readClusterName(in);
ClusterName clusterName = new ClusterName(in);
Builder builder = new Builder(clusterName);
builder.version = in.readLong();
builder.uuid = in.readString();
@ -805,7 +805,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
fromUuid = in.readString();
toUuid = in.readString();
toVersion = in.readLong();

View File

@ -18,12 +18,9 @@
*/
package org.elasticsearch.cluster.health;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -50,16 +47,6 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
private final ClusterHealthStatus status;
private final Map<String, ClusterIndexHealth> indices = new HashMap<>();
/**
* Creates a new <code>ClusterStateHealth</code> instance based on cluster meta data and its routing table as a convenience.
*
* @param clusterMetaData Current cluster meta data. Must not be null.
* @param routingTables Current routing table. Must not be null.
*/
public ClusterStateHealth(final MetaData clusterMetaData, final RoutingTable routingTables) {
this(ClusterState.builder(ClusterName.DEFAULT).metaData(clusterMetaData).routingTable(routingTables).build());
}
/**
* Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and all indices in the cluster.
*

View File

@ -103,7 +103,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final AllocationService allocationService;
private final Version version;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final Environment env;
@ -114,13 +113,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
@Inject
public MetaDataCreateIndexService(Settings settings, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService,
Version version, AliasValidator aliasValidator,
AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, Environment env, NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.allocationService = allocationService;
this.version = version;
this.aliasValidator = aliasValidator;
this.env = env;
this.nodeServicesProvider = nodeServicesProvider;
@ -287,7 +285,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
DiscoveryNodes nodes = currentState.nodes();
final Version createdVersion = Version.smallest(version, nodes.getSmallestNonClientNodeVersion());
final Version createdVersion = Version.smallest(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
}

View File

@ -299,7 +299,7 @@ public class MetaDataMappingService extends AbstractComponent {
assert mappingType != null;
if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && mappingType.charAt(0) == '_') {
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
throw new InvalidTypeNameException("Document mapping type name can't start with '_', found: [" + mappingType + "]");
}
MetaData.Builder builder = MetaData.builder(metaData);
for (Tuple<IndexService, IndexMetaData> toUpdate : updateList) {

View File

@ -46,12 +46,10 @@ public class DiscoveryNodeService extends AbstractComponent {
// don't use node.id.seed so it won't be seen as an attribute
Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, Property.NodeScope);
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<>();
private final Version version;
@Inject
public DiscoveryNodeService(Settings settings, Version version) {
public DiscoveryNodeService(Settings settings) {
super(settings);
this.version = version;
}
public static String generateNodeId(Settings settings) {
@ -93,7 +91,7 @@ public class DiscoveryNodeService extends AbstractComponent {
}
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes,
roles, version);
roles, Version.CURRENT);
}
public interface CustomAttributesProvider {

View File

@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -369,8 +370,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
}
if (ordered.isEmpty()) {
throw new IllegalArgumentException("no data nodes with critera(s) " +
Strings.arrayToCommaDelimitedString(nodeAttributes) + "] found for shard:" + shardId());
final String message = String.format(
Locale.ROOT,
"no data nodes with %s [%s] found for shard: %s",
nodeAttributes.length == 1 ? "criteria" : "criterion",
String.join(",", nodeAttributes),
shardId());
throw new IllegalArgumentException(message);
}
return new PlainShardIterator(shardId, ordered);
}

View File

@ -26,7 +26,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
@ -42,12 +42,17 @@ import java.util.stream.Collectors;
public class OperationRouting extends AbstractComponent {
private final AwarenessAllocationDecider awarenessAllocationDecider;
private String[] awarenessAttributes;
@Inject
public OperationRouting(Settings settings, AwarenessAllocationDecider awarenessAllocationDecider) {
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
super(settings);
this.awarenessAllocationDecider = awarenessAllocationDecider;
this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
this::setAwarenessAttributes);
}
private void setAwarenessAttributes(String[] awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
@ -111,7 +116,6 @@ public class OperationRouting extends AbstractComponent {
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) {
if (preference == null || preference.isEmpty()) {
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeInitializingShardsRandomIt();
} else {
@ -143,7 +147,6 @@ public class OperationRouting extends AbstractComponent {
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeInitializingShardsRandomIt();
} else {
@ -174,10 +177,6 @@ public class OperationRouting extends AbstractComponent {
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODE:
String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
ensureNodeIdExists(nodes, nodeId);
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
case ONLY_NODES:
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
@ -186,7 +185,6 @@ public class OperationRouting extends AbstractComponent {
}
}
// if not, then use it as the index
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeInitializingShardsIt(Murmur3HashFunction.hash(preference));
} else {

View File

@ -64,11 +64,6 @@ public enum Preference {
*/
ONLY_LOCAL("_only_local"),
/**
* Route to specific node only
*/
ONLY_NODE("_only_node"),
/**
* Route to only node with attribute
*/
@ -100,8 +95,6 @@ public enum Preference {
return SHARDS;
case "_prefer_nodes":
return PREFER_NODES;
case "_only_node":
return ONLY_NODE;
case "_local":
return LOCAL;
case "_primary":

View File

@ -72,10 +72,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> {
protected void doClose() {
}
public AllocationService getAllocationService() {
return this.allocationService;
}
/**
* Initiates a reroute.
*/

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
@ -68,6 +69,7 @@ public class AllocationService extends AbstractComponent {
private final GatewayAllocator gatewayAllocator;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private final ClusterName clusterName;
@Inject
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
@ -77,6 +79,7 @@ public class AllocationService extends AbstractComponent {
this.gatewayAllocator = gatewayAllocator;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
}
/**
@ -118,8 +121,10 @@ public class AllocationService extends AbstractComponent {
MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata
logClusterHealthStateChange(
new ClusterStateHealth(allocation.metaData(), allocation.routingTable()),
new ClusterStateHealth(newMetaData, newRoutingTable),
new ClusterStateHealth(ClusterState.builder(clusterName).
metaData(allocation.metaData()).routingTable(allocation.routingTable()).build()),
new ClusterStateHealth(ClusterState.builder(clusterName).
metaData(newMetaData).routingTable(newRoutingTable).build()),
reason
);
return new RoutingAllocation.Result(true, newRoutingTable, newMetaData, explanations);

View File

@ -203,7 +203,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
/**

View File

@ -126,15 +126,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
}
/**
* Get the attributes defined by this instance
*
* @return attributes defined by this instance
*/
public String[] awarenessAttributes() {
return this.awarenessAttributes;
}
private void setAwarenessAttributes(String[] awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

View File

@ -44,7 +44,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
@ -97,6 +96,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
private final ThreadPool threadPool;
private final ClusterName clusterName;
private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;
@ -130,14 +130,13 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
private NodeConnectionsService nodeConnectionsService;
@Inject
public ClusterService(Settings settings, OperationRouting operationRouting,
ClusterSettings clusterSettings, ThreadPool threadPool, ClusterName clusterName) {
public ClusterService(Settings settings,
ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.operationRouting = operationRouting;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
// will be replaced on doStart.
this.clusterState = ClusterState.builder(clusterName).build();
@ -238,7 +237,11 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
* The local node.
*/
public DiscoveryNode localNode() {
return clusterState.getNodes().getLocalNode();
DiscoveryNode localNode = clusterState.getNodes().getLocalNode();
if (localNode == null) {
throw new IllegalStateException("No local node found. Is the node started?");
}
return localNode;
}
public OperationRouting operationRouting() {
@ -490,6 +493,10 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
return true;
}
public ClusterName getClusterName() {
return clusterName;
}
static abstract class SourcePrioritizedRunnable extends PrioritizedRunnable {
protected final String source;
@ -1039,4 +1046,8 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
public ClusterSettings getClusterSettings() {
return clusterSettings;
}
public Settings getSettings() {
return settings;
}
}

View File

@ -73,6 +73,7 @@ import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestGetStoredSc
import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutSearchTemplateAction;
import org.elasticsearch.rest.action.admin.cluster.storedscripts.RestPutStoredScriptAction;
import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction;
import org.elasticsearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestShrinkIndexAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction;
@ -168,11 +169,11 @@ public class NetworkModule extends AbstractModule {
public static final String LOCAL_TRANSPORT = "local";
public static final String NETTY_TRANSPORT = "netty";
public static final Setting<String> HTTP_TYPE_SETTING = Setting.simpleString("http.type", Property.NodeScope);
public static final Setting<String> HTTP_TYPE_SETTING = Setting.simpleString(HTTP_TYPE_KEY, Property.NodeScope);
public static final Setting<Boolean> HTTP_ENABLED = Setting.boolSetting("http.enabled", true, Property.NodeScope);
public static final Setting<String> TRANSPORT_SERVICE_TYPE_SETTING =
Setting.simpleString("transport.service.type", Property.NodeScope);
public static final Setting<String> TRANSPORT_TYPE_SETTING = Setting.simpleString("transport.type", Property.NodeScope);
Setting.simpleString(TRANSPORT_SERVICE_TYPE_KEY, Property.NodeScope);
public static final Setting<String> TRANSPORT_TYPE_SETTING = Setting.simpleString(TRANSPORT_TYPE_KEY, Property.NodeScope);
@ -213,6 +214,7 @@ public class NetworkModule extends AbstractModule {
RestIndicesAliasesAction.class,
RestCreateIndexAction.class,
RestShrinkIndexAction.class,
RestRolloverIndexAction.class,
RestDeleteIndexAction.class,
RestCloseIndexAction.class,
RestOpenIndexAction.class,

View File

@ -263,6 +263,9 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
List<String> keys = scoredKeys.stream().map((a) -> a.v2()).collect(Collectors.toList());
if (keys.isEmpty() == false) {
msg += " did you mean " + (keys.size() == 1 ? "[" + keys.get(0) + "]": "any of " + keys.toString()) + "?";
} else {
msg += " please check that any required plugins are installed, or check the breaking changes documentation for removed " +
"settings";
}
throw new IllegalArgumentException(msg);
}

View File

@ -24,11 +24,11 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.bootstrap.BootstrapSettings;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;

View File

@ -134,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING,
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,

View File

@ -373,12 +373,11 @@ public class BigArrays implements Releasable {
final boolean checkBreaker;
private final BigArrays circuitBreakingInstance;
@Inject
public BigArrays(Settings settings, @Nullable final CircuitBreakerService breakerService) {
// Checking the breaker is disabled if not specified
this(new PageCacheRecycler(settings), breakerService, false);
}
// public for tests
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) {
this.checkBreaker = checkBreaker;
this.recycler = recycler;

View File

@ -22,7 +22,7 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent<Discovery> {
* Another hack to solve dep injection problem..., note, this will be called before
* any start is called.
*/
void setRoutingService(RoutingService routingService);
void setAllocationService(AllocationService allocationService);
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish

View File

@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
private final ClusterService clusterService;
private RoutingService routingService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
@ -75,16 +75,16 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private volatile ClusterState lastProcessedClusterState;
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, ClusterService clusterService, ClusterSettings clusterSettings) {
public LocalDiscovery(Settings settings, ClusterService clusterService, ClusterSettings clusterSettings) {
super(settings);
this.clusterName = clusterName;
this.clusterName = clusterService.getClusterName();
this.clusterService = clusterService;
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
}
@Override
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}
@Override
@ -156,7 +156,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add");
if (result.changed()) {
currentState = ClusterState.builder(currentState).routingResult(result).build();
}
return currentState;
}
@Override
@ -164,13 +169,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
master.routingService.reroute("post_node_add");
}
});
}
} // else, no master node, the next node that will start will fill things in...
@ -226,7 +224,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute(
RoutingAllocation.Result routingResult = master.allocationService.reroute(
ClusterState.builder(updatedState).build(), "elected as master");
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class NodeJoinController extends AbstractComponent {
private final ClusterService clusterService;
private final RoutingService routingService;
private final AllocationService allocationService;
private final ElectMasterService electMaster;
private final DiscoverySettings discoverySettings;
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
@ -68,10 +68,11 @@ public class NodeJoinController extends AbstractComponent {
private ElectionContext electionContext = null;
public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) {
public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster,
DiscoverySettings discoverySettings, Settings settings) {
super(settings);
this.clusterService = clusterService;
this.routingService = routingService;
this.allocationService = allocationService;
this.electMaster = electMaster;
this.discoverySettings = discoverySettings;
}
@ -406,21 +407,7 @@ public class NodeJoinController extends AbstractComponent {
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks);
newState.nodes(nodesBuilder);
nodesChanged = true;
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
// state is published.
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
}
if (nodesBuilder.isLocalNodeElectedMaster() == false) {
@ -439,7 +426,8 @@ public class NodeJoinController extends AbstractComponent {
for (DiscoveryNode existingNode : currentNodes) {
if (node.getAddress().equals(existingNode.getAddress())) {
nodesBuilder.remove(existingNode.getId());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
logger.warn("received join request from node [{}], but found existing node {} with same address, " +
"removing existing node", node, existingNode);
}
}
}
@ -448,6 +436,12 @@ public class NodeJoinController extends AbstractComponent {
if (nodesChanged) {
newState.nodes(nodesBuilder);
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
}
// we must return a new cluster state instance to force publishing. This is important
@ -463,13 +457,6 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
if (event.nodesDelta().hasChanges()) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
}
}

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final TransportService transportService;
private final ClusterService clusterService;
private RoutingService routingService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
@ -146,16 +146,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
/** counts the time this node has joined the cluster or have elected it self as master */
private final AtomicLong clusterJoinsCounter = new AtomicLong();
// must initialized in doStart(), when we have the routingService set
// must initialized in doStart(), when we have the allocationService set
private volatile NodeJoinController nodeJoinController;
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public ZenDiscovery(Settings settings, ThreadPool threadPool,
TransportService transportService, final ClusterService clusterService, ClusterSettings clusterSettings,
ZenPingService pingService, ElectMasterService electMasterService) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.clusterName = clusterService.getClusterName();
this.transportService = transportService;
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
this.pingService = pingService;
@ -182,10 +182,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
});
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterName, clusterService);
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterService);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterService.getClusterName());
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.publishClusterState =
@ -195,7 +195,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
clusterService::state,
new NewPendingClusterStateListener(),
discoverySettings,
clusterName);
clusterService.getClusterName());
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
@Override
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}
@Override
@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings);
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
}
@Override
@ -516,8 +516,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
ClusterState.builder(currentState).build(),
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(),
"[" + node + "] left");
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@ -561,7 +560,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).build(),
"[" + node + "] failed");
return ClusterState.builder(currentState).routingResult(routingResult).build();

View File

@ -53,9 +53,9 @@ public class ElectMasterService extends AbstractComponent {
private volatile int minimumMasterNodes;
@Inject
public ElectMasterService(Settings settings, Version version) {
public ElectMasterService(Settings settings) {
super(settings);
this.minMasterVersion = version.minimumCompatibilityVersion();
this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion();
this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
}

View File

@ -76,8 +76,8 @@ public class MasterFaultDetection extends FaultDetection {
private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();
public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterName clusterName, ClusterService clusterService) {
super(settings, threadPool, transportService, clusterName);
ClusterService clusterService) {
super(settings, threadPool, transportService, clusterService.getClusterName());
this.clusterService = clusterService;
logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
@ -418,7 +418,7 @@ public class MasterFaultDetection extends FaultDetection {
super.readFrom(in);
nodeId = in.readString();
masterNodeId = in.readString();
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
}
@Override

View File

@ -314,7 +314,7 @@ public class NodesFaultDetection extends FaultDetection {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
masterNode = new DiscoveryNode(in);
clusterStateVersion = in.readLong();
}

View File

@ -32,11 +32,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.cluster.ClusterName.readClusterName;
/**
*
*/
public interface ZenPing extends LifecycleComponent<ZenPing> {
void setPingContextProvider(PingContextProvider contextProvider);
@ -118,7 +113,7 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
@Override
public void readFrom(StreamInput in) throws IOException {
clusterName = readClusterName(in);
clusterName = new ClusterName(in);
node = new DiscoveryNode(in);
if (in.readBoolean()) {
master = new DiscoveryNode(in);

View File

@ -131,13 +131,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private volatile boolean closed = false;
@Inject
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
Version version, ElectMasterService electMasterService,
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.electMasterService = electMasterService;
if (unicastHostsProviders != null) {
@ -166,7 +165,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
for (TransportAddress address : addresses) {
configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
address, emptyMap(), emptySet(), version.minimumCompatibilityVersion()));
address, emptyMap(), emptySet(), getVersion().minimumCompatibilityVersion()));
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
@ -206,10 +205,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
hostsProviders.add(provider);
}
public void removeHostsProvider(UnicastHostsProvider provider) {
hostsProviders.remove(provider);
}
@Override
public void setPingContextProvider(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
@ -222,16 +217,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
temporalResponses.clear();
}
public PingResponse[] pingAndWait(TimeValue timeout) {
public PingResponse[] pingAndWait(TimeValue duration) {
final AtomicReference<PingResponse[]> response = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ping(new PingListener() {
@Override
public void onPing(PingResponse[] pings) {
response.set(pings);
latch.countDown();
}
}, timeout);
ping(pings -> {
response.set(pings);
latch.countDown();
}, duration);
try {
latch.await();
return response.get();
@ -241,25 +233,26 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
@Override
public void ping(final PingListener listener, final TimeValue timeout) {
public void ping(final PingListener listener, final TimeValue duration) {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
try {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
sendPings(duration, null, sendPingsHandler);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
sendPings(duration, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
listener.onPing(sendPingsHandler.pingCollection().toArray());
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
@ -592,4 +585,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
}
protected Version getVersion() {
return Version.CURRENT; // for tests
}
}

View File

@ -130,16 +130,17 @@ public class Environment {
pluginsFile = homeFile.resolve("plugins");
List<String> dataPaths = PATH_DATA_SETTING.get(settings);
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
if (dataPaths.isEmpty() == false) {
dataFiles = new Path[dataPaths.size()];
dataWithClusterFiles = new Path[dataPaths.size()];
for (int i = 0; i < dataPaths.size(); i++) {
dataFiles[i] = PathUtils.get(dataPaths.get(i));
dataWithClusterFiles[i] = dataFiles[i].resolve(ClusterName.clusterNameFromSettings(settings).value());
dataWithClusterFiles[i] = dataFiles[i].resolve(clusterName.value());
}
} else {
dataFiles = new Path[]{homeFile.resolve("data")};
dataWithClusterFiles = new Path[]{homeFile.resolve("data").resolve(ClusterName.clusterNameFromSettings(settings).value())};
dataWithClusterFiles = new Path[]{homeFile.resolve("data").resolve(clusterName.value())};
}
if (PATH_SHARED_DATA_SETTING.exists(settings)) {
sharedDataFile = PathUtils.get(cleanPath(PATH_SHARED_DATA_SETTING.get(settings)));

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.threadpool.ThreadPool;
/**
*
*/
public class EnvironmentModule extends AbstractModule {
private final Environment environment;
private final ThreadPool threadPool;
public EnvironmentModule(Environment environment, ThreadPool threadPool) {
this.threadPool = threadPool;
this.environment = environment;
}
@Override
protected void configure() {
bind(ThreadPool.class).toInstance(threadPool);
bind(Environment.class).toInstance(environment);
}
}

View File

@ -32,12 +32,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -66,7 +63,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -167,7 +163,6 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
public static final String NODE_LOCK_FILENAME = "node.lock";
public static final String UPGRADE_LOCK_FILENAME = "upgrade.lock";
@Inject
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);

View File

@ -155,7 +155,7 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
metaDataBuilder.persistentSettings(clusterSettings.archiveUnknownOrBrokenSettings(metaDataBuilder.persistentSettings()));
metaDataBuilder.transientSettings(clusterSettings.archiveUnknownOrBrokenSettings(metaDataBuilder.transientSettings()));
ClusterState.Builder builder = ClusterState.builder(clusterService.state().getClusterName());
ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build());
}

View File

@ -27,11 +27,6 @@ import org.elasticsearch.common.settings.Settings;
*/
public class GatewayModule extends AbstractModule {
private final Settings settings;
public GatewayModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {

View File

@ -58,10 +58,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
private GatewayMetaState metaState;
@Inject
public TransportNodesListGatewayMetaState(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);
}
@ -91,7 +91,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
@Override
protected NodesGatewayMetaState newResponse(Request request, List<NodeGatewayMetaState> responses, List<FailedNodeException> failures) {
return new NodesGatewayMetaState(clusterName, responses, failures);
return new NodesGatewayMetaState(clusterService.getClusterName(), responses, failures);
}
@Override

View File

@ -69,12 +69,12 @@ public class TransportNodesListGatewayStartedShards extends
@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NodeEnvironment env) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED,
NodeGatewayStartedShards.class);
this.nodeEnv = env;
@ -111,7 +111,7 @@ public class TransportNodesListGatewayStartedShards extends
@Override
protected NodesGatewayStartedShards newResponse(Request request,
List<NodeGatewayStartedShards> responses, List<FailedNodeException> failures) {
return new NodesGatewayStartedShards(clusterName, responses, failures);
return new NodesGatewayStartedShards(clusterService.getClusterName(), responses, failures);
}
@Override

View File

@ -45,6 +45,7 @@ import org.elasticsearch.indices.mapper.MapperRegistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -74,6 +75,14 @@ public final class IndexModule {
public static final Setting<String> INDEX_STORE_TYPE_SETTING =
new Setting<>("index.store.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);
/** On which extensions to load data into the file-system cache upon opening of files.
* This only works with the mmap directory, and even in that case is still
* best-effort only. */
public static final Setting<List<String>> INDEX_STORE_PRE_LOAD_SETTING =
Setting.listSetting("index.store.preload", Collections.emptyList(), Function.identity(),
Property.IndexScope, Property.NodeScope);
public static final String SIMILARITY_SETTINGS_PREFIX = "index.similarity";
// whether to use the query cache

View File

@ -445,7 +445,7 @@ public class TypeParsers {
private static SimilarityProvider resolveSimilarity(Mapper.TypeParser.ParserContext parserContext, String name, String value) {
if (parserContext.indexVersionCreated().before(Version.V_5_0_0_alpha1) && "default".equals(value)) {
// "default" similarity has been renamed into "classic" in 3.x.
value = SimilarityService.DEFAULT_SIMILARITY;
value = "classic";
}
SimilarityProvider similarityProvider = parserContext.getSimilarity(value);
if (similarityProvider == null) {

View File

@ -131,8 +131,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {

View File

@ -36,7 +36,7 @@ import java.util.function.BiFunction;
public final class SimilarityService extends AbstractIndexComponent {
public final static String DEFAULT_SIMILARITY = "classic";
public final static String DEFAULT_SIMILARITY = "BM25";
private final Similarity defaultSimilarity;
private final Similarity baseSimilarity;
private final Map<String, SimilarityProvider> similarities;
@ -121,8 +121,8 @@ public final class SimilarityService extends AbstractIndexComponent {
return similarities.get(name);
}
public SimilarityProvider getDefaultSimilarity() {
return similarities.get("default");
Similarity getDefaultSimilarity() {
return defaultSimilarity;
}
static class PerFieldSimilarity extends PerFieldSimilarityWrapper {

View File

@ -105,8 +105,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private RateLimiter restoreRateLimiter;
private RateLimiterListener rateLimiterListener;
private RateLimitingInputStream.Listener snapshotThrottleListener;
private RateLimitingInputStream.Listener restoreThrottleListener;
@ -163,7 +161,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
this.chunkSize = chunkSize;
this.snapshotRateLimiter = snapshotRateLimiter;
this.restoreRateLimiter = restoreRateLimiter;
this.rateLimiterListener = rateLimiterListener;
this.snapshotThrottleListener = nanos -> rateLimiterListener.onSnapshotPause(nanos);
this.restoreThrottleListener = nanos -> rateLimiterListener.onRestorePause(nanos);
this.compress = compress;

View File

@ -31,13 +31,11 @@ import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SleepingLockWrapper;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.Constants;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
@ -45,7 +43,7 @@ import org.elasticsearch.index.shard.ShardPath;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
@ -87,8 +85,12 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
@Override
public Directory newDirectory() throws IOException {
final Path location = path.resolveIndex();
final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING);
Files.createDirectories(location);
Directory wrapped = newFSDirectory(location, indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING));
Directory wrapped = newFSDirectory(location, lockFactory);
Set<String> preLoadExtensions = new HashSet<>(
indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
wrapped = setPreload(wrapped, location, lockFactory, preLoadExtensions);
if (IndexMetaData.isOnSharedFilesystem(indexSettings.getSettings())) {
wrapped = new SleepingLockWrapper(wrapped, 5000);
}
@ -100,25 +102,11 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
rateLimitingTimeInNanos.inc(nanos);
}
/*
* We are mmapping norms, docvalues as well as term dictionaries, all other files are served through NIOFS
* this provides good random access performance while not creating unnecessary mmaps for files like stored
* fields etc.
*/
private static final Set<String> PRIMARY_EXTENSIONS = Collections.unmodifiableSet(Sets.newHashSet("nvd", "dvd", "tim"));
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
final String storeType = indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(),
IndexModule.Type.FS.getSettingsKey());
if (IndexModule.Type.FS.match(storeType) || isDefault(storeType)) {
final FSDirectory open = FSDirectory.open(location, lockFactory); // use lucene defaults
if (open instanceof MMapDirectory
&& isDefault(storeType)
&& Constants.WINDOWS == false) {
return newDefaultDir(location, (MMapDirectory) open, lockFactory);
}
return open;
if (IndexModule.Type.FS.match(storeType) || IndexModule.Type.DEFAULT.match(storeType)) {
return FSDirectory.open(location, lockFactory); // use lucene defaults
} else if (IndexModule.Type.SIMPLEFS.match(storeType)) {
return new SimpleFSDirectory(location, lockFactory);
} else if (IndexModule.Type.NIOFS.match(storeType)) {
@ -129,17 +117,25 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
throw new IllegalArgumentException("No directory found for type [" + storeType + "]");
}
private static boolean isDefault(String storeType) {
return IndexModule.Type.DEFAULT.match(storeType);
}
private Directory newDefaultDir(Path location, final MMapDirectory mmapDir, LockFactory lockFactory) throws IOException {
return new FileSwitchDirectory(PRIMARY_EXTENSIONS, mmapDir, new NIOFSDirectory(location, lockFactory), true) {
@Override
public String[] listAll() throws IOException {
// Avoid doing listAll twice:
return mmapDir.listAll();
private static Directory setPreload(Directory directory, Path location, LockFactory lockFactory,
Set<String> preLoadExtensions) throws IOException {
if (preLoadExtensions.isEmpty() == false
&& directory instanceof MMapDirectory
&& ((MMapDirectory) directory).getPreload() == false) {
if (preLoadExtensions.contains("*")) {
((MMapDirectory) directory).setPreload(true);
return directory;
}
};
MMapDirectory primary = new MMapDirectory(location, lockFactory);
primary.setPreload(true);
return new FileSwitchDirectory(preLoadExtensions, primary, directory, true) {
@Override
public String[] listAll() throws IOException {
// avoid listing twice
return primary.listAll();
}
};
}
return directory;
}
}

View File

@ -19,10 +19,14 @@
package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
@ -74,10 +78,18 @@ public class IndicesModule extends AbstractModule {
// Use a LinkedHashMap for metadataMappers because iteration order matters
private final Map<String, MetadataFieldMapper.TypeParser> metadataMapperParsers
= new LinkedHashMap<>();
private final NamedWriteableRegistry namedWritableRegistry;
public IndicesModule() {
public IndicesModule(NamedWriteableRegistry namedWriteableRegistry) {
this.namedWritableRegistry = namedWriteableRegistry;
registerBuiltInMappers();
registerBuiltInMetadataMappers();
registerBuildInWritables();
}
private void registerBuildInWritables() {
namedWritableRegistry.register(Condition.class, MaxAgeCondition.NAME, MaxAgeCondition::new);
namedWritableRegistry.register(Condition.class, MaxDocsCondition.NAME, MaxDocsCondition::new);
}
private void registerBuiltInMappers() {

View File

@ -351,7 +351,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
private IndexShard getShard(ShardActiveRequest request) {
ClusterName thisClusterName = clusterService.state().getClusterName();
ClusterName thisClusterName = clusterService.getClusterName();
if (!thisClusterName.equals(request.clusterName)) {
logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName);
return null;
@ -385,7 +385,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterName = new ClusterName(in);
indexUUID = in.readString();
shardId = ShardId.readShardId(in);
timeout = new TimeValue(in.readLong(), TimeUnit.MILLISECONDS);

View File

@ -76,11 +76,11 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
private final NodeEnvironment nodeEnv;
@Inject
public TransportNodesListShardStoreMetaData(Settings settings, ClusterName clusterName, ThreadPool threadPool,
public TransportNodesListShardStoreMetaData(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
IndicesService indicesService, NodeEnvironment nodeEnv, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STORE, NodeStoreFilesMetaData.class);
this.indicesService = indicesService;
this.nodeEnv = nodeEnv;
@ -111,7 +111,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
@Override
protected NodesStoreFilesMetaData newResponse(Request request,
List<NodeStoreFilesMetaData> responses, List<FailedNodeException> failures) {
return new NodesStoreFilesMetaData(clusterName, responses, failures);
return new NodesStoreFilesMetaData(clusterService.getClusterName(), responses, failures);
}
@Override

View File

@ -17,8 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest.core;
package org.elasticsearch.ingest;
/**
* An Abstract Processor that holds a processorTag field to be used

View File

@ -17,8 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest.core;
package org.elasticsearch.ingest;
import java.util.Map;

View File

@ -18,7 +18,7 @@
*/
//TODO(simonw): can all these classes go into org.elasticsearch.ingest?
package org.elasticsearch.ingest.core;
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException;
@ -103,7 +103,8 @@ public class CompoundProcessor implements Processor {
continue;
}
ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag());
ElasticsearchException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException;
} else {

View File

@ -17,10 +17,9 @@
* under the License.
*/
package org.elasticsearch.ingest.core;
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.ProcessorsRegistry;
import java.util.ArrayList;
import java.util.Arrays;

Some files were not shown because too many files have changed in this diff Show More