From e993c6a862aec1ba28371dd21788754bc5b4ea49 Mon Sep 17 00:00:00 2001 From: David Pilato Date: Mon, 9 Nov 2015 15:35:06 +0100 Subject: [PATCH 1/3] Migrate mapper attachements plugin to asciidoc Followup for #14605 --- .../plugins/mapper-attachments.asciidoc | 183 +++++++++++------- docs/plugins/mapper.asciidoc | 8 +- docs/plugins/plugin-script.asciidoc | 5 +- docs/reference/mapping/types.asciidoc | 4 +- 4 files changed, 123 insertions(+), 77 deletions(-) rename plugins/mapper-attachments/README.md => docs/plugins/mapper-attachments.asciidoc (70%) diff --git a/plugins/mapper-attachments/README.md b/docs/plugins/mapper-attachments.asciidoc similarity index 70% rename from plugins/mapper-attachments/README.md rename to docs/plugins/mapper-attachments.asciidoc index 7114ad7b2cf..c13d8ee0b4e 100644 --- a/plugins/mapper-attachments/README.md +++ b/docs/plugins/mapper-attachments.asciidoc @@ -1,25 +1,46 @@ -Mapper Attachments Type for Elasticsearch -========================================= +[[mapper-attachments]] +=== Mapper Attachments Plugin -The mapper attachments plugin lets Elasticsearch index file attachments in common formats (such as PPT, XLS, PDF) using the Apache text extraction library [Tika](http://lucene.apache.org/tika/). +The mapper attachments plugin lets Elasticsearch index file attachments in common formats (such as PPT, XLS, PDF) +using the Apache text extraction library http://lucene.apache.org/tika/[Tika]. -In practice, the plugin adds the `attachment` type when mapping properties so that documents can be populated with file attachment contents (encoded as `base64`). +In practice, the plugin adds the `attachment` type when mapping properties so that documents can be populated with +file attachment contents (encoded as `base64`). -Installation ------------- +[[mapper-attachments-install]] +[float] +==== Installation -In order to install the plugin, run: +This plugin can be installed using the plugin manager: -```sh -bin/plugin install mapper-attachments -``` +[source,sh] +---------------------------------------------------------------- +sudo bin/plugin install mapper-attachments +---------------------------------------------------------------- -Hello, world ------------- +The plugin must be installed on every node in the cluster, and each node must +be restarted after installation. + +[[mapper-attachments-remove]] +[float] +==== Removal + +The plugin can be removed with the following command: + +[source,sh] +---------------------------------------------------------------- +sudo bin/plugin remove mapper-attachments +---------------------------------------------------------------- + +The node must be stopped before removing the plugin. + +[[mapper-attachments-helloworld]] +==== Hello, world Create a property mapping using the new type `attachment`: -```javascript +[source,js] +-------------------------- POST /trying-out-mapper-attachments { "mappings": { @@ -27,36 +48,42 @@ POST /trying-out-mapper-attachments "properties": { "cv": { "type": "attachment" } }}}} -``` +-------------------------- +// AUTOSENSE Index a new document populated with a `base64`-encoded attachment: -```javascript +[source,js] +-------------------------- POST /trying-out-mapper-attachments/person/1 { "cv": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=" } -``` +-------------------------- +// AUTOSENSE Search for the document using words in the attachment: -```javascript +[source,js] +-------------------------- POST /trying-out-mapper-attachments/person/_search { "query": { "query_string": { "query": "ipsum" }}} -``` +-------------------------- +// AUTOSENSE If you get a hit for your indexed document, the plugin should be installed and working. -Usage ------------------------- +[[mapper-attachments-usage]] +==== Usage Using the attachment type is simple, in your mapping JSON, simply set a certain JSON element as attachment, for example: -```javascript +[source,js] +-------------------------- PUT /test PUT /test/person/_mapping { @@ -66,20 +93,24 @@ PUT /test/person/_mapping } } } -``` +-------------------------- +// AUTOSENSE In this case, the JSON to index can be: -```javascript +[source,js] +-------------------------- PUT /test/person/1 { "my_attachment" : "... base64 encoded attachment ..." } -``` +-------------------------- +// AUTOSENSE Or it is possible to use more elaborated JSON if content type, resource name or language need to be set explicitly: -``` +[source,js] +-------------------------- PUT /test/person/1 { "my_attachment" : { @@ -89,9 +120,10 @@ PUT /test/person/1 "_content" : "... base64 encoded attachment ..." } } -``` +-------------------------- +// AUTOSENSE -The `attachment` type not only indexes the content of the doc in `content` sub field, but also automatically adds meta +The `attachment` type not only indexes the content of the doc in `content` sub field, but also automatically adds meta data on the attachment as well (when available). The metadata supported are: @@ -107,10 +139,11 @@ The metadata supported are: They can be queried using the "dot notation", for example: `my_attachment.author`. -Both the meta data and the actual content are simple core type mappers (string, date, ...), thus, they can be controlled +Both the meta data and the actual content are simple core type mappers (string, date, …), thus, they can be controlled in the mappings. For example: -```javascript +[source,js] +-------------------------- PUT /test/person/_mapping { "person" : { @@ -131,19 +164,21 @@ PUT /test/person/_mapping } } } -``` +-------------------------- +// AUTOSENSE In the above example, the actual content indexed is mapped under `fields` name `content`, and we decide not to index it, so it will only be available in the `_all` field. The other fields map to their respective metadata names, but there is no need to specify the `type` (like `string` or `date`) since it is already known. -Copy To feature ---------------- +[[mapper-attachments-copy-to]] +==== Copy To feature -If you want to use [copy_to](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html#copy-to) +If you want to use http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html#copy-to[copy_to] feature, you need to define it on each sub-field you want to copy to another field: -```javascript +[source,js] +-------------------------- PUT /test/person/_mapping { "person": { @@ -163,16 +198,18 @@ PUT /test/person/_mapping } } } -``` +-------------------------- +// AUTOSENSE In this example, the extracted content will be copy as well to `copy` field. -Querying or accessing metadata ------------------------------- +[[mapper-attachments-querying-metadata]] +==== Querying or accessing metadata If you need to query on metadata fields, use the attachment field name dot the metadata field. For example: -``` +[source,js] +-------------------------- DELETE /test PUT /test PUT /test/person/_mapping @@ -204,11 +241,13 @@ GET /test/person/_search } } } -``` +-------------------------- +// AUTOSENSE Will give you: -``` +[source,js] +-------------------------- { "took": 2, "timed_out": false, @@ -235,17 +274,18 @@ Will give you: ] } } -``` +-------------------------- -Indexed Characters ------------------- +[[mapper-attachments-indexed-characters]] +==== Indexed Characters By default, `100000` characters are extracted when indexing the content. This default value can be changed by setting the `index.mapping.attachment.indexed_chars` setting. It can also be provided on a per document indexed using the `_indexed_chars` parameter. `-1` can be set to extract all text, but note that all the text needs to be allowed to be represented in memory: -``` +[source,js] +-------------------------- PUT /test/person/1 { "my_attachment" : { @@ -253,18 +293,19 @@ PUT /test/person/1 "_content" : "... base64 encoded attachment ..." } } -``` +-------------------------- +// AUTOSENSE -Metadata parsing error handling -------------------------------- +[[mapper-attachments-error-handling]] +==== Metadata parsing error handling While extracting metadata content, errors could happen for example when parsing dates. Parsing errors are ignored so your document is indexed. You can disable this feature by setting the `index.mapping.attachment.ignore_errors` setting to `false`. -Language Detection ------------------- +[[mapper-attachments-language-detection]] +==== Language Detection By default, language detection is disabled (`false`) as it could come with a cost. This default value can be changed by setting the `index.mapping.attachment.detect_language` setting. @@ -272,22 +313,24 @@ It can also be provided on a per document indexed using the `_detect_language` p Note that you can force language using `_language` field when sending your actual document: -```javascript +[source,js] +-------------------------- { "my_attachment" : { "_language" : "en", "_content" : "... base64 encoded attachment ..." } } -``` +-------------------------- -Highlighting attachments ------------------------- +[[mapper-attachments-highlighting]] +==== Highlighting attachments -If you want to highlight your attachment content, you will need to set `"store": true` and `"term_vector":"with_positions_offsets"` -for your attachment field. Here is a full script which does it: +If you want to highlight your attachment content, you will need to set `"store": true` and +`"term_vector":"with_positions_offsets"` for your attachment field. Here is a full script which does it: -``` +[source,js] +-------------------------- DELETE /test PUT /test PUT /test/person/_mapping @@ -326,11 +369,13 @@ GET /test/person/_search } } } -``` +-------------------------- +// AUTOSENSE It gives back: -```js +[source,js] +-------------------------- { "took": 9, "timed_out": false, @@ -357,29 +402,31 @@ It gives back: ] } } -``` +-------------------------- -Stand alone runner ------------------- +[[mapper-attachments-standalone]] +==== Stand alone runner If you want to run some tests within your IDE, you can use `StandaloneRunner` class. It accepts arguments: -* `-u file://URL/TO/YOUR/DOC` -* `--size` set extracted size (default to mapper attachment size) -* `BASE64` encoded binary +* `-u file://URL/TO/YOUR/DOC` +* `--size` set extracted size (default to mapper attachment size) +* `BASE64` encoded binary Example: -```sh +[source,sh] +-------------------------- StandaloneRunner BASE64Text StandaloneRunner -u /tmp/mydoc.pdf StandaloneRunner -u /tmp/mydoc.pdf --size 1000000 -``` +-------------------------- It produces something like: -``` +[source,text] +-------------------------- ## Extracted text --------------------- BEGIN ----------------------- This is the extracted text @@ -393,4 +440,4 @@ This is the extracted text - language: null - name: null - title: null -``` +-------------------------- diff --git a/docs/plugins/mapper.asciidoc b/docs/plugins/mapper.asciidoc index c6a3a7b35aa..fcfc877f8f9 100644 --- a/docs/plugins/mapper.asciidoc +++ b/docs/plugins/mapper.asciidoc @@ -8,11 +8,10 @@ Mapper plugins allow new field datatypes to be added to Elasticsearch. The core mapper plugins are: -https://github.com/elasticsearch/elasticsearch-mapper-attachments[Mapper Attachments Type plugin]:: +<>:: -Integrates http://lucene.apache.org/tika/[Apache Tika] to provide a new field -type `attachment` to allow indexing of documents such as PDFs and Microsoft -Word. +The mapper-attachments integrates http://lucene.apache.org/tika/[Apache Tika] to provide a new field +type `attachment` to allow indexing of documents such as PDFs and Microsoft Word. <>:: @@ -25,5 +24,6 @@ indexes the size in bytes of the original The mapper-murmur3 plugin allows hashes to be computed at index-time and stored in the index for later use with the `cardinality` aggregation. +include::mapper-attachments.asciidoc[] include::mapper-size.asciidoc[] include::mapper-murmur3.asciidoc[] diff --git a/docs/plugins/plugin-script.asciidoc b/docs/plugins/plugin-script.asciidoc index 3f7a30556af..58ba1e53d71 100644 --- a/docs/plugins/plugin-script.asciidoc +++ b/docs/plugins/plugin-script.asciidoc @@ -75,13 +75,12 @@ sudo bin/plugin install lmenezes/elasticsearch-kopf/2.x <2> When installing from Maven Central/Sonatype, `[org]` should be replaced by the artifact `groupId`, and `[user|component]` by the `artifactId`. For -instance, to install the -https://github.com/elastic/elasticsearch-mapper-attachments[mapper attachment] +instance, to install the {plugins}/mapper-attachments.html[`mapper-attachments`] plugin from Sonatype, run: [source,shell] ----------------------------------- -sudo bin/plugin install org.elasticsearch/elasticsearch-mapper-attachments/2.6.0 <1> +sudo bin/plugin install org.elasticsearch.plugin/mapper-attachments/3.0.0 <1> ----------------------------------- <1> When installing from `download.elastic.co` or from Maven Central/Sonatype, the version is required. diff --git a/docs/reference/mapping/types.asciidoc b/docs/reference/mapping/types.asciidoc index 52cd41e37e5..60d96577a43 100644 --- a/docs/reference/mapping/types.asciidoc +++ b/docs/reference/mapping/types.asciidoc @@ -37,8 +37,8 @@ document: Attachment datatype:: - See the https://github.com/elastic/elasticsearch-mapper-attachments[mapper attachment plugin] - which supports indexing ``attachments'' like Microsoft Office formats, Open + See the {plugins}/mapper-attachments.html[`mapper-attachments`] plugin + which supports indexing `attachments` like Microsoft Office formats, Open Document formats, ePub, HTML, etc. into an `attachment` datatype. [float] From 28109a18a2dc1f30207d09bdfb2148e8d7906337 Mon Sep 17 00:00:00 2001 From: David Pilato Date: Mon, 23 Nov 2015 13:14:02 +0100 Subject: [PATCH 2/3] Fix example for s3 repository bucket name Closes #13588. --- docs/plugins/repository-azure.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/plugins/repository-azure.asciidoc b/docs/plugins/repository-azure.asciidoc index 1e5dabb75fb..9846b5fbf58 100644 --- a/docs/plugins/repository-azure.asciidoc +++ b/docs/plugins/repository-azure.asciidoc @@ -117,7 +117,7 @@ PUT _snapshot/my_backup2 { "type": "azure", "settings": { - "container": "backup_container", + "container": "backup-container", "base_path": "backups", "chunk_size": "32m", "compress": true @@ -150,7 +150,7 @@ Example using Java: ---- client.admin().cluster().preparePutRepository("my_backup_java1") .setType("azure").setSettings(Settings.settingsBuilder() - .put(Storage.CONTAINER, "backup_container") + .put(Storage.CONTAINER, "backup-container") .put(Storage.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB)) ).get(); ---- From 4a6f3c78408c316d9f6f212d2664adda13c54a44 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 20 Nov 2015 11:37:47 +0100 Subject: [PATCH 3/3] Make sure the remaining delay of unassigned shard is updated with every reroute For example: if a node left the cluster and an async store fetch was triggered. In that time no shard is marked as delayed (and strictly speaking it's not yet delayed). This caused test for shard delays post node left to fail. see : http://build-us-00.elastic.co/job/es_core_master_windows-2012-r2/2074/testReport/ To fix this, the delay update is now done by the Allocation Service, based of a fixed time stamp that is determined at the beginning of the reroute. Also, this commit fixes a bug where unassigned info instances were reused across shard routings, causing calculated delays to be leaked. Closes #14890 --- .../cluster/routing/RoutingService.java | 22 +++--- .../cluster/routing/UnassignedInfo.java | 75 ++++++++++++------- .../routing/allocation/AllocationService.java | 42 +++++++---- .../allocation/FailedRerouteAllocation.java | 2 +- .../routing/allocation/RoutingAllocation.java | 16 +++- .../allocation/StartedRerouteAllocation.java | 2 +- .../allocator/ShardsAllocators.java | 6 +- .../command/AllocateAllocationCommand.java | 3 +- .../command/CancelAllocationCommand.java | 6 +- .../gateway/ReplicaShardAllocator.java | 14 ++-- .../ClusterStateCreationUtils.java | 12 +-- .../cluster/ClusterStateDiffIT.java | 17 +---- .../cluster/routing/AllocationIdTests.java | 5 +- .../cluster/routing/DelayedAllocationIT.java | 2 - .../routing/RandomShardRoutingMutator.java | 4 +- .../cluster/routing/RoutingServiceTests.java | 59 +++------------ .../cluster/routing/UnassignedInfoTests.java | 69 +++++++++++------ .../decider/DiskThresholdDeciderTests.java | 26 ++----- .../DiskThresholdDeciderUnitTests.java | 16 +--- .../gateway/PrimaryShardAllocatorTests.java | 28 +++---- .../gateway/ReplicaShardAllocatorTests.java | 18 ++--- .../IndexLifecycleActionIT.java | 14 ++-- .../indices/state/RareClusterStateIT.java | 13 +--- .../test/ESAllocationTestCase.java | 53 +++++++------ 24 files changed, 242 insertions(+), 282 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 0f04c750d38..6f43e880e3f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -55,7 +55,7 @@ public class RoutingService extends AbstractLifecycleComponent i private final AllocationService allocationService; private AtomicBoolean rerouting = new AtomicBoolean(); - private volatile long minDelaySettingAtLastScheduling = Long.MAX_VALUE; + private volatile long minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; private volatile ScheduledFuture registeredNextDelayFuture; @Inject @@ -100,14 +100,14 @@ public class RoutingService extends AbstractLifecycleComponent i // Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule. // If the minimum of the currently relevant delay settings is larger than something we scheduled in the past, // we are guaranteed that the planned schedule will happen before any of the current shard delays are expired. - long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state()); + long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(settings, event.state()); if (minDelaySetting <= 0) { - logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling); - minDelaySettingAtLastScheduling = Long.MAX_VALUE; + logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos); + minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; FutureUtils.cancel(registeredNextDelayFuture); - } else if (minDelaySetting < minDelaySettingAtLastScheduling) { + } else if (minDelaySetting < minDelaySettingAtLastSchedulingNanos) { FutureUtils.cancel(registeredNextDelayFuture); - minDelaySettingAtLastScheduling = minDelaySetting; + minDelaySettingAtLastSchedulingNanos = minDelaySetting; TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state())); assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]"; logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", @@ -115,25 +115,25 @@ public class RoutingService extends AbstractLifecycleComponent i registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { @Override protected void doRun() throws Exception { - minDelaySettingAtLastScheduling = Long.MAX_VALUE; + minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; reroute("assign delayed unassigned shards"); } @Override public void onFailure(Throwable t) { logger.warn("failed to schedule/execute reroute post unassigned shard", t); - minDelaySettingAtLastScheduling = Long.MAX_VALUE; + minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; } }); } else { - logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling); + logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos); } } } // visible for testing - long getMinDelaySettingAtLastScheduling() { - return this.minDelaySettingAtLastScheduling; + long getMinDelaySettingAtLastSchedulingNanos() { + return this.minDelaySettingAtLastSchedulingNanos; } // visible for testing diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index bfe3c316cda..23733b96f0c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.concurrent.TimeUnit; /** * Holds additional information as to why the shard is in unassigned state. @@ -110,18 +109,27 @@ public class UnassignedInfo implements ToXContent, Writeable { private final String message; private final Throwable failure; + /** + * creates an UnassingedInfo object based **current** time + * + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + **/ public UnassignedInfo(Reason reason, String message) { - this(reason, System.currentTimeMillis(), System.nanoTime(), message, null); + this(reason, message, null, System.nanoTime(), System.currentTimeMillis()); } - public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure) { - this(reason, System.currentTimeMillis(), System.nanoTime(), message, failure); - } - - private UnassignedInfo(Reason reason, long unassignedTimeMillis, long timestampNanos, String message, Throwable failure) { + /** + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + */ + public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) { this.reason = reason; this.unassignedTimeMillis = unassignedTimeMillis; - this.unassignedTimeNanos = timestampNanos; + this.unassignedTimeNanos = unassignedTimeNanos; this.message = message; this.failure = failure; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; @@ -201,14 +209,14 @@ public class UnassignedInfo implements ToXContent, Writeable { } /** - * The allocation delay value in milliseconds associated with the index (defaulting to node settings if not set). + * The allocation delay value in nano seconds associated with the index (defaulting to node settings if not set). */ - public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) { + public long getAllocationDelayTimeoutSettingNanos(Settings settings, Settings indexSettings) { if (reason != Reason.NODE_LEFT) { return 0; } TimeValue delayTimeout = indexSettings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, settings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, DEFAULT_DELAYED_NODE_LEFT_TIMEOUT)); - return Math.max(0l, delayTimeout.millis()); + return Math.max(0l, delayTimeout.nanos()); } /** @@ -221,18 +229,17 @@ public class UnassignedInfo implements ToXContent, Writeable { /** * Updates delay left based on current time (in nanoseconds) and index/node settings. - * Should only be called from ReplicaShardAllocator. + * * @return updated delay in nanoseconds */ public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) { - long delayTimeoutMillis = getAllocationDelayTimeoutSetting(settings, indexSettings); + long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings); final long newComputedLeftDelayNanos; - if (delayTimeoutMillis == 0l) { + if (delayTimeoutNanos == 0l) { newComputedLeftDelayNanos = 0l; } else { assert nanoTimeNow >= unassignedTimeNanos; - long delayTimeoutNanos = TimeUnit.NANOSECONDS.convert(delayTimeoutMillis, TimeUnit.MILLISECONDS); - newComputedLeftDelayNanos = Math.max(0l, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos)); + newComputedLeftDelayNanos = Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos)); } lastComputedLeftDelayNanos = newComputedLeftDelayNanos; return newComputedLeftDelayNanos; @@ -255,21 +262,21 @@ public class UnassignedInfo implements ToXContent, Writeable { } /** - * Finds the smallest delay expiration setting in milliseconds of all unassigned shards that are still delayed. Returns 0 if there are none. + * Finds the smallest delay expiration setting in nanos of all unassigned shards that are still delayed. Returns 0 if there are none. */ - public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) { - long nextDelaySetting = Long.MAX_VALUE; + public static long findSmallestDelayedAllocationSettingNanos(Settings settings, ClusterState state) { + long minDelaySetting = Long.MAX_VALUE; for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { if (shard.primary() == false) { IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); - long leftDelayNanos = shard.unassignedInfo().getLastComputedLeftDelayNanos(); - long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings()); - if (leftDelayNanos > 0 && delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) { - nextDelaySetting = delayTimeoutSetting; + boolean delayed = shard.unassignedInfo().getLastComputedLeftDelayNanos() > 0; + long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSettingNanos(settings, indexMetaData.getSettings()); + if (delayed && delayTimeoutSetting > 0 && delayTimeoutSetting < minDelaySetting) { + minDelaySetting = delayTimeoutSetting; } } } - return nextDelaySetting == Long.MAX_VALUE ? 0l : nextDelaySetting; + return minDelaySetting == Long.MAX_VALUE ? 0l : minDelaySetting; } @@ -320,14 +327,24 @@ public class UnassignedInfo implements ToXContent, Writeable { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } UnassignedInfo that = (UnassignedInfo) o; - if (unassignedTimeMillis != that.unassignedTimeMillis) return false; - if (reason != that.reason) return false; - if (message != null ? !message.equals(that.message) : that.message != null) return false; + if (unassignedTimeMillis != that.unassignedTimeMillis) { + return false; + } + if (reason != that.reason) { + return false; + } + if (message != null ? !message.equals(that.message) : that.message != null) { + return false; + } return !(failure != null ? !failure.equals(that.failure) : that.failure != null); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index b8f02f24578..f819d6fde0a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -22,16 +22,12 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -43,9 +39,7 @@ import org.elasticsearch.common.settings.Settings; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; @@ -119,7 +113,8 @@ public class AllocationService extends AbstractComponent { FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards, clusterInfoService.getClusterInfo()); boolean changed = false; for (FailedRerouteAllocation.FailedShard failedShard : failedShards) { - changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure)); + changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure, + System.nanoTime(), System.currentTimeMillis())); } if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable()); @@ -163,7 +158,7 @@ public class AllocationService extends AbstractComponent { // we don't shuffle the unassigned shards here, to try and get as close as possible to // a consistent result of the effect the commands have on the routing // this allows systems to dry run the commands, see the resulting cluster state, and act on it - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime()); // don't short circuit deciders, we want a full explanation allocation.debugDecision(true); // we ignore disable allocation, because commands are explicit @@ -202,7 +197,7 @@ public class AllocationService extends AbstractComponent { RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime()); allocation.debugDecision(debug); if (!reroute(allocation)) { return new RoutingAllocation.Result(false, clusterState.routingTable()); @@ -239,6 +234,8 @@ public class AllocationService extends AbstractComponent { // now allocate all the unassigned to available nodes if (allocation.routingNodes().unassigned().size() > 0) { + updateLeftDelayOfUnassignedShards(allocation, settings); + changed |= shardsAllocators.allocateUnassigned(allocation); } @@ -251,6 +248,15 @@ public class AllocationService extends AbstractComponent { return changed; } + // public for testing + public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) { + for (ShardRouting shardRouting : allocation.routingNodes().unassigned()) { + final MetaData metaData = allocation.metaData(); + final IndexMetaData indexMetaData = metaData.index(shardRouting.index()); + shardRouting.unassignedInfo().updateDelay(allocation.getCurrentNanoTime(), settings, indexMetaData.getSettings()); + } + } + private boolean moveShards(RoutingAllocation allocation) { boolean changed = false; @@ -312,7 +318,9 @@ public class AllocationService extends AbstractComponent { } } for (ShardRouting shardToFail : shardsToFail) { - changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing")); + changed |= applyFailedShard(allocation, shardToFail, false, + new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); } // now, go over and elect a new primary if possible, not, from this code block on, if one is elected, @@ -372,8 +380,9 @@ public class AllocationService extends AbstractComponent { } changed = true; // now, go over all the shards routing on the node, and fail them - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]"); for (ShardRouting shardRouting : node.copyShards()) { + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null, + allocation.getCurrentNanoTime(), System.currentTimeMillis()); applyFailedShard(allocation, shardRouting, false, unassignedInfo); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard @@ -531,4 +540,9 @@ public class AllocationService extends AbstractComponent { RoutingNodes routingNodes = new RoutingNodes(clusterState, false); // this is a costly operation - only call this once! return routingNodes; } + + /** ovrride this to control time based decisions during allocation */ + protected long currentNanoTime() { + return System.nanoTime(); + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java index 24e38279f4d..835556a265b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -58,7 +58,7 @@ public class FailedRerouteAllocation extends RoutingAllocation { private final List failedShards; public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards, ClusterInfo clusterInfo) { - super(deciders, routingNodes, nodes, clusterInfo); + super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime()); this.failedShards = failedShards; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 140c4ad6692..a8a546d946e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -120,19 +120,27 @@ public class RoutingAllocation { private boolean hasPendingAsyncFetch = false; + private final long currentNanoTime; + /** * Creates a new {@link RoutingAllocation} - * - * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations - * @param routingNodes Routing nodes in the current cluster + * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations + * @param routingNodes Routing nodes in the current cluster * @param nodes TODO: Documentation + * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()}) */ - public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo) { + public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo, long currentNanoTime) { this.deciders = deciders; this.routingNodes = routingNodes; this.nodes = nodes; this.clusterInfo = clusterInfo; + this.currentNanoTime = currentNanoTime; + } + + /** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */ + public long getCurrentNanoTime() { + return currentNanoTime; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java index da69419d948..00f3944ae03 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java @@ -36,7 +36,7 @@ public class StartedRerouteAllocation extends RoutingAllocation { private final List startedShards; public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards, ClusterInfo clusterInfo) { - super(deciders, routingNodes, nodes, clusterInfo); + super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime()); this.startedShards = startedShards; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index 003988f7bd5..a9ce43c5f76 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -19,8 +19,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -74,6 +74,10 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat return changed; } + protected long nanoTime() { + return System.nanoTime(); + } + @Override public boolean rebalance(RoutingAllocation allocation) { if (allocation.hasPendingAsyncFetch() == false) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java index b210557b687..5646d308dda 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java @@ -229,7 +229,8 @@ public class AllocateAllocationCommand implements AllocationCommand { // it was index creation if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) { unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, - "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure())); + "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), + unassigned.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis())); } it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); break; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index d9ad8c4f871..7554fa4c46f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -21,7 +21,10 @@ package org.elasticsearch.cluster.routing.allocation.command; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RerouteExplanation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -34,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; /** diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index a830aa6198e..c87f4d94755 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -24,7 +24,6 @@ import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -101,7 +100,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { // we found a better match that has a full sync id match, the existing allocation is not fully synced // so we found a better one, cancel this one it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]")); + "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); changed = true; } } @@ -111,7 +111,6 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { } public boolean allocateUnassigned(RoutingAllocation allocation) { - long nanoTimeNow = System.nanoTime(); boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); @@ -171,7 +170,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { } } else if (matchingNodes.hasAnyData() == false) { // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed - changed |= ignoreUnassignedIfDelayed(nanoTimeNow, allocation, unassignedIterator, shard); + changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard); } } return changed; @@ -185,16 +184,13 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { * * PUBLIC FOR TESTS! * - * @param timeNowNanos Timestamp in nanoseconds representing "now" - * @param allocation the routing allocation * @param unassignedIterator iterator over unassigned shards * @param shard the shard which might be delayed * @return true iff allocation is delayed for this shard */ - public boolean ignoreUnassignedIfDelayed(long timeNowNanos, RoutingAllocation allocation, RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) { - IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex()); + public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) { // calculate delay and store it in UnassignedInfo to be used by RoutingService - long delay = shard.unassignedInfo().updateDelay(timeNowNanos, settings, indexMetaData.getSettings()); + long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos(); if (delay > 0) { logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueNanos(delay)); /** diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 913d52d5b17..406e476b4e0 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -27,12 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.index.shard.ShardId; @@ -40,10 +35,7 @@ import org.elasticsearch.index.shard.ShardId; import java.util.HashSet; import java.util.Set; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomIntBetween; diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index c2e646dde19..8d4540aad3b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -20,25 +20,13 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; - import org.elasticsearch.Version; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.RepositoriesMetaData; -import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.metadata.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -50,7 +38,6 @@ import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.warmer.IndexWarmersMetaData; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 8d6953ed929..88ad66c4a91 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -21,10 +21,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.test.ESTestCase; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; /** */ diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java index f34f46a7ad4..c236ea54878 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -37,7 +36,6 @@ import static org.hamcrest.Matchers.equalTo; /** */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -@LuceneTestCase.AwaitsFix(bugUrl = "http://build-us-00.elastic.co/job/es_core_master_windows-2012-r2/2074/testReport/ (boaz on it)") public class DelayedAllocationIT extends ESIntegTestCase { /** diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java index b451183826b..47ae3e68580 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java @@ -19,9 +19,7 @@ package org.elasticsearch.cluster.routing; -import static org.elasticsearch.test.ESTestCase.randomAsciiOfLength; -import static org.elasticsearch.test.ESTestCase.randomFrom; -import static org.elasticsearch.test.ESTestCase.randomInt; +import static org.elasticsearch.test.ESTestCase.*; /** * Utility class the makes random modifications to ShardRouting diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java index b0e597614d8..1711b0c33a8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -88,15 +88,14 @@ public class RoutingServiceTests extends ESAllocationTestCase { clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); ClusterState newState = clusterState; - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE)); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE)); routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE)); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE)); assertThat(routingService.hasReroutedAndClear(), equalTo(false)); } public void testDelayedUnassignedScheduleReroute() throws Exception { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) .numberOfShards(1).numberOfReplicas(1)) @@ -126,7 +125,6 @@ public class RoutingServiceTests extends ESAllocationTestCase { ClusterState prevState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build(); // make sure the replica is marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMillis(randomIntBetween(0, 99)).nanos()); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); assertEquals(1, clusterState.getRoutingNodes().unassigned().size()); @@ -134,7 +132,7 @@ public class RoutingServiceTests extends ESAllocationTestCase { routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear())); // verify the registration has been reset - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE)); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE)); } /** @@ -144,8 +142,7 @@ public class RoutingServiceTests extends ESAllocationTestCase { final ThreadPool testThreadPool = new ThreadPool(getTestName()); try { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) .numberOfShards(1).numberOfReplicas(1)) @@ -185,11 +182,13 @@ public class RoutingServiceTests extends ESAllocationTestCase { } assertNotNull(longDelayReplica); + final long baseTime = System.nanoTime(); + // remove node of shortDelayReplica and node of longDelayReplica and reroute ClusterState prevState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build(); // make sure both replicas are marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1); + allocation.setNanoTimeOverride(baseTime); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); // check that shortDelayReplica and longDelayReplica have been marked unassigned @@ -216,7 +215,7 @@ public class RoutingServiceTests extends ESAllocationTestCase { RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation); routingService.start(); // just so performReroute does not prematurely return // next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueSeconds(1).nanos()); + allocation.setNanoTimeOverride(baseTime + TimeValue.timeValueSeconds(1).nanos()); // register listener on cluster state so we know when cluster state has been changed CountDownLatch latch = new CountDownLatch(1); clusterService.addLast(event -> latch.countDown()); @@ -225,50 +224,12 @@ public class RoutingServiceTests extends ESAllocationTestCase { // cluster service should have updated state and called routingService with clusterChanged latch.await(); // verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(TimeValue.timeValueSeconds(10).millis())); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(TimeValue.timeValueSeconds(10).nanos())); } finally { terminate(testThreadPool); } } - public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) - .numberOfShards(1).numberOfReplicas(1)) - .build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .metaData(metaData) - .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build(); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build(); - clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); - // starting primaries - clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build(); - // starting replicas - clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build(); - assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); - // remove node2 and reroute - ClusterState prevState = clusterState; - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); - clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); - // Set it in the future so the delay will be negative - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMinutes(1).nanos()); - - ClusterState newState = clusterState; - - routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(routingService.hasReroutedAndClear(), equalTo(false)); - - // verify the registration has been updated - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(100L)); - } - }); - } - private class TestRoutingService extends RoutingService { private AtomicBoolean rerouted = new AtomicBoolean(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 4847a86e0d6..3288b92cb8e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -40,15 +39,8 @@ import org.elasticsearch.test.ESAllocationTestCase; import java.util.Collections; import java.util.EnumSet; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.hamcrest.Matchers.*; /** */ @@ -282,9 +274,32 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertThat(delay, equalTo(0l)); } + /** + * Verifies that delayed allocation calculation are correct. + */ + public void testLeftDelayCalculation() throws Exception { + final long baseTime = System.nanoTime(); + final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis()); + final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); + final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueNanos(totalDelayNanos)).build(); + long delay = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY); + assertThat(delay, equalTo(totalDelayNanos)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1)); + delay = unassignedInfo.updateDelay(baseTime + delta1, settings, Settings.EMPTY); + assertThat(delay, equalTo(totalDelayNanos - delta1)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos, settings, Settings.EMPTY); + assertThat(delay, equalTo(0L)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), settings, Settings.EMPTY); + assertThat(delay, equalTo(0L)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + } + + public void testNumberOfDelayedUnassigned() throws Exception { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -303,17 +318,21 @@ public class UnassignedInfoTests extends ESAllocationTestCase { // remove node2 and reroute clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); // make sure both replicas are marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(2)); } public void testFindNextDelayedAllocation() { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + final long baseTime = System.nanoTime(); + allocation.setNanoTimeOverride(baseTime); + final TimeValue delayTest1 = TimeValue.timeValueMillis(randomIntBetween(1, 200)); + final TimeValue delayTest2 = TimeValue.timeValueMillis(randomIntBetween(1, 200)); + final long expectMinDelaySettingsNanos = Math.min(delayTest1.nanos(), delayTest2.nanos()); + MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1)) - .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest1)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest2)).numberOfShards(1).numberOfReplicas(1)) .build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) .metaData(metaData) @@ -328,15 +347,19 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); // remove node2 and reroute clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); - // make sure both replicas are marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); - long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState); - assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis())); + final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos); + + if (delta > 0) { + allocation.setNanoTimeOverride(baseTime + delta); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "time moved")).build(); + } + + long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(Settings.EMPTY, clusterState); + assertThat(minDelaySetting, equalTo(expectMinDelaySettingsNanos)); long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState); - assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).nanos())); - assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).nanos())); + assertThat(nextDelay, equalTo(expectMinDelaySettingsNanos - delta)); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 6d141a758e8..a739f30856a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -29,14 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; @@ -55,14 +48,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; public class DiskThresholdDeciderTests extends ESAllocationTestCase { @@ -859,7 +847,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ) ); ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.NO)); @@ -879,7 +867,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ) ); clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); @@ -978,7 +966,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ) ); ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); // Two shards should start happily @@ -1035,7 +1023,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ); clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build(); - routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index fa31d8306e5..a386883ad1b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -20,21 +20,13 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.DiskUsage; -import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -129,7 +121,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); shardSizes.put("[test][0][p]", 10L); // 10 bytes final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of()); - RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime()); assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation)); assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation)); } @@ -194,7 +186,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { shardSizes.put("[test][2][p]", 10L); final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build()); - RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime()); assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation)); assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation)); try { diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 25982150340..73cbb51faed 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -27,13 +27,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RestoreSource; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.common.Nullable; @@ -48,9 +42,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.*; /** */ @@ -184,7 +176,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime()); testAllocator.addData(node1, -1).addData(node2, -1); boolean changed = testAllocator.allocateUnassigned(allocation); @@ -208,7 +200,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -216,7 +208,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node1, 1); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -224,7 +216,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node2, 1); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); @@ -249,7 +241,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -257,7 +249,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node1, 1); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -265,7 +257,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node2, 2); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); @@ -336,7 +328,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); } class TestAllocator extends PrimaryShardAllocator { diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 6a0aabe8d53..9a053b36527 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.gateway; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; @@ -28,15 +27,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -232,6 +224,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { // we sometime return empty list of files, make sure we test this as well testAllocator.addData(node2, false, null); } + AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -240,6 +233,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT); testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); @@ -296,7 +290,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime()); } private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { @@ -315,7 +309,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime()); } class TestAllocator extends ReplicaShardAllocator { diff --git a/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java b/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java index 933d8506531..818937c511e 100644 --- a/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java +++ b/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java @@ -20,12 +20,13 @@ package org.elasticsearch.indexlifecycle; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; @@ -42,15 +43,9 @@ import static org.elasticsearch.client.Requests.clusterHealthRequest; import static org.elasticsearch.client.Requests.createIndexRequest; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; /** @@ -62,6 +57,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase { Settings settings = settingsBuilder() .put(SETTING_NUMBER_OF_SHARDS, 11) .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0s") .build(); // start one server diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index c55d80b7c51..ab3f825cecc 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -56,19 +56,12 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.*; /** */ @@ -99,7 +92,7 @@ public class RareClusterStateIT extends ESIntegTestCase { .nodes(DiscoveryNodes.EMPTY_NODES) .build(), false ); - RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY); + RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY, System.nanoTime()); allocator.allocateUnassigned(routingAllocation); } diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java index 65540a3c536..c4f4b196739 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -47,12 +47,7 @@ import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.gateway.NoopGatewayAllocator; import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.function.Function; +import java.util.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; @@ -63,32 +58,32 @@ import static org.hamcrest.CoreMatchers.is; */ public abstract class ESAllocationTestCase extends ESTestCase { - public static AllocationService createAllocationService() { + public static MockAllocationService createAllocationService() { return createAllocationService(Settings.Builder.EMPTY_SETTINGS); } - public static AllocationService createAllocationService(Settings settings) { + public static MockAllocationService createAllocationService(Settings settings) { return createAllocationService(settings, getRandom()); } - public static AllocationService createAllocationService(Settings settings, Random random) { + public static MockAllocationService createAllocationService(Settings settings, Random random) { return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random); } - public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { - return new AllocationService(settings, + public static MockAllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { + return new MockAllocationService(settings, randomAllocationDeciders(settings, nodeSettingsService, random), new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); } - public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { - return new AllocationService(settings, + public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { + return new MockAllocationService(settings, randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); } - public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { - return new AllocationService(settings, + public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { + return new MockAllocationService(settings, randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE); } @@ -187,9 +182,27 @@ public abstract class ESAllocationTestCase extends ESTestCase { } } + /** A lock {@link AllocationService} allowing tests to override time */ + protected static class MockAllocationService extends AllocationService { + + private Long nanoTimeOverride = null; + + public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) { + super(settings, allocationDeciders, shardsAllocators, clusterInfoService); + } + + public void setNanoTimeOverride(long nanoTime) { + this.nanoTimeOverride = nanoTime; + } + + @Override + protected long currentNanoTime() { + return nanoTimeOverride == null ? super.currentNanoTime() : nanoTimeOverride; + } + } + /** * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. - * Also computes delay in UnassignedInfo based on customizable time source. */ protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) { @@ -199,16 +212,11 @@ public abstract class ESAllocationTestCase extends ESTestCase { } }; - private volatile Function timeSource; public DelayedShardsMockGatewayAllocator() { super(Settings.EMPTY, null, null); } - public void setTimeSource(Function timeSource) { - this.timeSource = timeSource; - } - @Override public void applyStartedShards(StartedRerouteAllocation allocation) {} @@ -224,8 +232,7 @@ public abstract class ESAllocationTestCase extends ESTestCase { if (shard.primary() || shard.allocatedPostIndexCreate() == false) { continue; } - changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(timeSource == null ? System.nanoTime() : timeSource.apply(shard), - allocation, unassignedIterator, shard); + changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard); } return changed; }