Merge branch 'master' into close-index-api-refactoring
This commit is contained in:
commit
d70ebfd1d6
|
@ -8,7 +8,7 @@ and to configure your jobs to analyze aggregated data.
|
|||
|
||||
One of the benefits of aggregating data this way is that {es} automatically
|
||||
distributes these calculations across your cluster. You can then feed this
|
||||
aggregated data into {xpackml} instead of raw results, which
|
||||
aggregated data into the {ml-features} instead of raw results, which
|
||||
reduces the volume of data that must be considered while detecting anomalies.
|
||||
|
||||
There are some limitations to using aggregations in {dfeeds}, however.
|
||||
|
|
|
@ -269,7 +269,7 @@ probability of this occurrence.
|
|||
|
||||
There can be many anomaly records depending on the characteristics and size of
|
||||
the input data. In practice, there are often too many to be able to manually
|
||||
process them. The {xpackml} features therefore perform a sophisticated
|
||||
process them. The {ml-features} therefore perform a sophisticated
|
||||
aggregation of the anomaly records into buckets.
|
||||
|
||||
The number of record results depends on the number of anomalies found in each
|
||||
|
|
|
@ -2,12 +2,12 @@
|
|||
[[ml-configuring]]
|
||||
== Configuring machine learning
|
||||
|
||||
If you want to use {xpackml} features, there must be at least one {ml} node in
|
||||
If you want to use {ml-features}, there must be at least one {ml} node in
|
||||
your cluster and all master-eligible nodes must have {ml} enabled. By default,
|
||||
all nodes are {ml} nodes. For more information about these settings, see
|
||||
{ref}/modules-node.html#modules-node-xpack[{ml} nodes].
|
||||
|
||||
To use the {xpackml} features to analyze your data, you must create a job and
|
||||
To use the {ml-features} to analyze your data, you must create a job and
|
||||
send your data to that job.
|
||||
|
||||
* If your data is stored in {es}:
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
[[ml-functions]]
|
||||
== Function reference
|
||||
|
||||
The {xpackml} features include analysis functions that provide a wide variety of
|
||||
The {ml-features} include analysis functions that provide a wide variety of
|
||||
flexible ways to analyze data for anomalies.
|
||||
|
||||
When you create jobs, you specify one or more detectors, which define the type of
|
||||
|
|
|
@ -14,7 +14,7 @@ in one field is unusual, as opposed to the total count.
|
|||
Use high-sided functions if you want to monitor unusually high event rates.
|
||||
Use low-sided functions if you want to look at drops in event rate.
|
||||
|
||||
The {xpackml} features include the following count functions:
|
||||
The {ml-features} include the following count functions:
|
||||
|
||||
* xref:ml-count[`count`, `high_count`, `low_count`]
|
||||
* xref:ml-nonzero-count[`non_zero_count`, `high_non_zero_count`, `low_non_zero_count`]
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
The geographic functions detect anomalies in the geographic location of the
|
||||
input data.
|
||||
|
||||
The {xpackml} features include the following geographic function: `lat_long`.
|
||||
The {ml-features} include the following geographic function: `lat_long`.
|
||||
|
||||
NOTE: You cannot create forecasts for jobs that contain geographic functions.
|
||||
You also cannot add rules with conditions to detectors that use geographic
|
||||
|
@ -72,7 +72,7 @@ For example, JSON data might contain the following transaction coordinates:
|
|||
|
||||
In {es}, location data is likely to be stored in `geo_point` fields. For more
|
||||
information, see {ref}/geo-point.html[Geo-point datatype]. This data type is not
|
||||
supported natively in {xpackml} features. You can, however, use Painless scripts
|
||||
supported natively in {ml-features}. You can, however, use Painless scripts
|
||||
in `script_fields` in your {dfeed} to transform the data into an appropriate
|
||||
format. For example, the following Painless script transforms
|
||||
`"coords": {"lat" : 41.44, "lon":90.5}` into `"lat-lon": "41.44,90.5"`:
|
||||
|
|
|
@ -6,7 +6,7 @@ that is contained in strings within a bucket. These functions can be used as
|
|||
a more sophisticated method to identify incidences of data exfiltration or
|
||||
C2C activity, when analyzing the size in bytes of the data might not be sufficient.
|
||||
|
||||
The {xpackml} features include the following information content functions:
|
||||
The {ml-features} include the following information content functions:
|
||||
|
||||
* `info_content`, `high_info_content`, `low_info_content`
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ The metric functions include functions such as mean, min and max. These values
|
|||
are calculated for each bucket. Field values that cannot be converted to
|
||||
double precision floating point numbers are ignored.
|
||||
|
||||
The {xpackml} features include the following metric functions:
|
||||
The {ml-features} include the following metric functions:
|
||||
|
||||
* <<ml-metric-min,`min`>>
|
||||
* <<ml-metric-max,`max`>>
|
||||
|
|
|
@ -27,7 +27,7 @@ with shorter bucket spans typically being measured in minutes, not hours.
|
|||
for typical data.
|
||||
====
|
||||
|
||||
The {xpackml} features include the following rare functions:
|
||||
The {ml-features} include the following rare functions:
|
||||
|
||||
* <<ml-rare,`rare`>>
|
||||
* <<ml-freq-rare,`freq_rare`>>
|
||||
|
@ -85,7 +85,7 @@ different rare status codes compared to the population is regarded as highly
|
|||
anomalous. This analysis is based on the number of different status code values,
|
||||
not the count of occurrences.
|
||||
|
||||
NOTE: To define a status code as rare the {xpackml} features look at the number
|
||||
NOTE: To define a status code as rare the {ml-features} look at the number
|
||||
of distinct status codes that occur, not the number of times the status code
|
||||
occurs. If a single client IP experiences a single unique status code, this
|
||||
is rare, even if it occurs for that client IP in every bucket.
|
||||
|
|
|
@ -11,7 +11,7 @@ If want to look at drops in totals, use low-sided functions.
|
|||
If your data is sparse, use `non_null_sum` functions. Buckets without values are
|
||||
ignored; buckets with a zero value are analyzed.
|
||||
|
||||
The {xpackml} features include the following sum functions:
|
||||
The {ml-features} include the following sum functions:
|
||||
|
||||
* xref:ml-sum[`sum`, `high_sum`, `low_sum`]
|
||||
* xref:ml-nonnull-sum[`non_null_sum`, `high_non_null_sum`, `low_non_null_sum`]
|
||||
|
|
|
@ -6,7 +6,7 @@ The time functions detect events that happen at unusual times, either of the day
|
|||
or of the week. These functions can be used to find unusual patterns of behavior,
|
||||
typically associated with suspicious user activity.
|
||||
|
||||
The {xpackml} features include the following time functions:
|
||||
The {ml-features} include the following time functions:
|
||||
|
||||
* <<ml-time-of-day,`time_of_day`>>
|
||||
* <<ml-time-of-week,`time_of_week`>>
|
||||
|
|
|
@ -569,7 +569,7 @@ GET _ml/datafeeds/datafeed-test4/_preview
|
|||
// TEST[skip:needs-licence]
|
||||
|
||||
In {es}, location data can be stored in `geo_point` fields but this data type is
|
||||
not supported natively in {xpackml} analytics. This example of a script field
|
||||
not supported natively in {ml} analytics. This example of a script field
|
||||
transforms the data into an appropriate format. For more information,
|
||||
see <<ml-geo-functions>>.
|
||||
|
||||
|
|
|
@ -9,10 +9,9 @@ If {xpack} is installed, there is an additional node type:
|
|||
<<ml-node,Machine learning node>>::
|
||||
|
||||
A node that has `xpack.ml.enabled` and `node.ml` set to `true`, which is the
|
||||
default behavior when {xpack} is installed. If you want to use {xpackml}
|
||||
features, there must be at least one {ml} node in your cluster. For more
|
||||
information about {xpackml} features,
|
||||
see {xpack-ref}/xpack-ml.html[Machine Learning in the Elastic Stack].
|
||||
default behavior when {xpack} is installed. If you want to use {ml-features}, there must be at least one {ml} node in your cluster. For more
|
||||
information about {ml-features},
|
||||
see {stack-ov}/xpack-ml.html[Machine learning in the {stack}].
|
||||
|
||||
IMPORTANT: Do not set use the `node.ml` setting unless {xpack} is installed.
|
||||
Otherwise, the node fails to start.
|
||||
|
@ -88,11 +87,11 @@ node.ml: false <5>
|
|||
[[ml-node]]
|
||||
=== [xpack]#Machine learning node#
|
||||
|
||||
The {xpackml} features provide {ml} nodes, which run jobs and handle {ml} API
|
||||
The {ml-features} provide {ml} nodes, which run jobs and handle {ml} API
|
||||
requests. If `xpack.ml.enabled` is set to true and `node.ml` is set to `false`,
|
||||
the node can service API requests but it cannot run jobs.
|
||||
|
||||
If you want to use {xpackml} features in your cluster, you must enable {ml}
|
||||
If you want to use {ml-features} in your cluster, you must enable {ml}
|
||||
(set `xpack.ml.enabled` to `true`) on all master-eligible nodes. Do not use
|
||||
these settings if you do not have {xpack} installed.
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.ParseField;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -255,6 +256,10 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
|
|||
if (mappings.containsKey(type)) {
|
||||
throw new IllegalStateException("mappings for type \"" + type + "\" were already defined");
|
||||
}
|
||||
// wrap it in a type map if its not
|
||||
if (source.size() != 1 || !source.containsKey(type)) {
|
||||
source = MapBuilder.<String, Object>newMapBuilder().put(type, source).map();
|
||||
}
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
|
||||
builder.map(source);
|
||||
|
@ -269,7 +274,7 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
|
|||
* ("field1", "type=string,store=true").
|
||||
*/
|
||||
public CreateIndexRequest mapping(String type, Object... source) {
|
||||
mapping(type, PutMappingRequest.buildFromSimplifiedDef(source));
|
||||
mapping(type, PutMappingRequest.buildFromSimplifiedDef(type, source));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -184,18 +184,6 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
|
|||
return source(buildFromSimplifiedDef(type, source));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param source
|
||||
* consisting of field/properties pairs (e.g. "field1",
|
||||
* "type=string,store=true")
|
||||
* @throws IllegalArgumentException
|
||||
* if the number of the source arguments is not divisible by two
|
||||
* @return the mappings definition
|
||||
*/
|
||||
public static XContentBuilder buildFromSimplifiedDef(Object... source) {
|
||||
return buildFromSimplifiedDef(null, source);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param type
|
||||
* the mapping type
|
||||
|
|
|
@ -179,8 +179,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
|||
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
allowPartialSearchResults = in.readOptionalBoolean();
|
||||
}
|
||||
//TODO update version after backport
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
|
||||
localClusterAlias = in.readOptionalString();
|
||||
if (localClusterAlias != null) {
|
||||
absoluteStartMillis = in.readVLong();
|
||||
|
@ -211,8 +210,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
|||
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
out.writeOptionalBoolean(allowPartialSearchResults);
|
||||
}
|
||||
//TODO update version after backport
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
|
||||
out.writeOptionalString(localClusterAlias);
|
||||
if (localClusterAlias != null) {
|
||||
out.writeVLong(absoluteStartMillis);
|
||||
|
|
|
@ -33,48 +33,6 @@ public final class Numbers {
|
|||
private static final BigInteger MIN_LONG_VALUE = BigInteger.valueOf(Long.MIN_VALUE);
|
||||
|
||||
private Numbers() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to an short.
|
||||
*
|
||||
* @param arr The byte array to convert to an short
|
||||
* @return The int converted
|
||||
*/
|
||||
public static short bytesToShort(byte[] arr) {
|
||||
return (short) (((arr[0] & 0xff) << 8) | (arr[1] & 0xff));
|
||||
}
|
||||
|
||||
public static short bytesToShort(BytesRef bytes) {
|
||||
return (short) (((bytes.bytes[bytes.offset] & 0xff) << 8) | (bytes.bytes[bytes.offset + 1] & 0xff));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to an int.
|
||||
*
|
||||
* @param arr The byte array to convert to an int
|
||||
* @return The int converted
|
||||
*/
|
||||
public static int bytesToInt(byte[] arr) {
|
||||
return (arr[0] << 24) | ((arr[1] & 0xff) << 16) | ((arr[2] & 0xff) << 8) | (arr[3] & 0xff);
|
||||
}
|
||||
|
||||
public static int bytesToInt(BytesRef bytes) {
|
||||
return (bytes.bytes[bytes.offset] << 24) | ((bytes.bytes[bytes.offset + 1] & 0xff) << 16) |
|
||||
((bytes.bytes[bytes.offset + 2] & 0xff) << 8) | (bytes.bytes[bytes.offset + 3] & 0xff);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to a long.
|
||||
*
|
||||
* @param arr The byte array to convert to a long
|
||||
* @return The long converter
|
||||
*/
|
||||
public static long bytesToLong(byte[] arr) {
|
||||
int high = (arr[0] << 24) | ((arr[1] & 0xff) << 16) | ((arr[2] & 0xff) << 8) | (arr[3] & 0xff);
|
||||
int low = (arr[4] << 24) | ((arr[5] & 0xff) << 16) | ((arr[6] & 0xff) << 8) | (arr[7] & 0xff);
|
||||
return (((long) high) << 32) | (low & 0x0ffffffffL);
|
||||
}
|
||||
|
||||
public static long bytesToLong(BytesRef bytes) {
|
||||
|
@ -85,40 +43,6 @@ public final class Numbers {
|
|||
return (((long) high) << 32) | (low & 0x0ffffffffL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to float.
|
||||
*
|
||||
* @param arr The byte array to convert to a float
|
||||
* @return The float converted
|
||||
*/
|
||||
public static float bytesToFloat(byte[] arr) {
|
||||
return Float.intBitsToFloat(bytesToInt(arr));
|
||||
}
|
||||
|
||||
public static float bytesToFloat(BytesRef bytes) {
|
||||
return Float.intBitsToFloat(bytesToInt(bytes));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to double.
|
||||
*
|
||||
* @param arr The byte array to convert to a double
|
||||
* @return The double converted
|
||||
*/
|
||||
public static double bytesToDouble(byte[] arr) {
|
||||
return Double.longBitsToDouble(bytesToLong(arr));
|
||||
}
|
||||
|
||||
public static double bytesToDouble(BytesRef bytes) {
|
||||
return Double.longBitsToDouble(bytesToLong(bytes));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts an int to a byte array.
|
||||
*
|
||||
* @param val The int to convert to a byte array
|
||||
* @return The byte array converted
|
||||
*/
|
||||
public static byte[] intToBytes(int val) {
|
||||
byte[] arr = new byte[4];
|
||||
arr[0] = (byte) (val >>> 24);
|
||||
|
@ -160,16 +84,6 @@ public final class Numbers {
|
|||
return arr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a float to a byte array.
|
||||
*
|
||||
* @param val The float to convert to a byte array
|
||||
* @return The byte array converted
|
||||
*/
|
||||
public static byte[] floatToBytes(float val) {
|
||||
return intToBytes(Float.floatToRawIntBits(val));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a double to a byte array.
|
||||
*
|
||||
|
|
|
@ -133,6 +133,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
IndexSettings.INDEX_GC_DELETES_SETTING,
|
||||
IndexSettings.INDEX_SOFT_DELETES_SETTING,
|
||||
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
|
||||
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING,
|
||||
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
|
||||
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
|
||||
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
|
||||
|
|
|
@ -1019,8 +1019,8 @@ public final class Settings implements ToXContentFragment {
|
|||
* @param value The time value
|
||||
* @return The builder
|
||||
*/
|
||||
public Builder put(String setting, long value, TimeUnit timeUnit) {
|
||||
put(setting, timeUnit.toMillis(value) + "ms");
|
||||
public Builder put(final String setting, final long value, final TimeUnit timeUnit) {
|
||||
put(setting, new TimeValue(value, timeUnit));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -1366,9 +1366,9 @@ public class DateFormatters {
|
|||
} else if ("yearMonthDay".equals(input) || "year_month_day".equals(input)) {
|
||||
return YEAR_MONTH_DAY;
|
||||
} else if ("epoch_second".equals(input)) {
|
||||
return EpochSecondsDateFormatter.INSTANCE;
|
||||
return EpochTime.SECONDS_FORMATTER;
|
||||
} else if ("epoch_millis".equals(input)) {
|
||||
return EpochMillisDateFormatter.INSTANCE;
|
||||
return EpochTime.MILLIS_FORMATTER;
|
||||
// strict date formats here, must be at least 4 digits for year and two for months and two for day
|
||||
} else if ("strictBasicWeekDate".equals(input) || "strict_basic_week_date".equals(input)) {
|
||||
return STRICT_BASIC_WEEK_DATE;
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.time;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.time.temporal.TemporalAccessor;
|
||||
import java.util.Locale;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class EpochSecondsDateFormatter implements DateFormatter {
|
||||
|
||||
public static DateFormatter INSTANCE = new EpochSecondsDateFormatter();
|
||||
static final DateMathParser DATE_MATH_INSTANCE = new JavaDateMathParser(INSTANCE, INSTANCE);
|
||||
private static final Pattern SPLIT_BY_DOT_PATTERN = Pattern.compile("\\.");
|
||||
|
||||
private EpochSecondsDateFormatter() {}
|
||||
|
||||
@Override
|
||||
public TemporalAccessor parse(String input) {
|
||||
try {
|
||||
if (input.contains(".")) {
|
||||
String[] inputs = SPLIT_BY_DOT_PATTERN.split(input, 2);
|
||||
Long seconds = Long.valueOf(inputs[0]);
|
||||
if (inputs[1].length() == 0) {
|
||||
// this is BWC compatible to joda time, nothing after the dot is allowed
|
||||
return Instant.ofEpochSecond(seconds, 0).atZone(ZoneOffset.UTC);
|
||||
}
|
||||
// scientific notation it is!
|
||||
if (inputs[1].contains("e")) {
|
||||
return Instant.ofEpochSecond(Double.valueOf(input).longValue()).atZone(ZoneOffset.UTC);
|
||||
}
|
||||
if (inputs[1].length() > 9) {
|
||||
throw new DateTimeParseException("too much granularity after dot [" + input + "]", input, 0);
|
||||
}
|
||||
Long nanos = new BigDecimal(inputs[1]).movePointRight(9 - inputs[1].length()).longValueExact();
|
||||
if (seconds < 0) {
|
||||
nanos = nanos * -1;
|
||||
}
|
||||
return Instant.ofEpochSecond(seconds, nanos).atZone(ZoneOffset.UTC);
|
||||
} else {
|
||||
return Instant.ofEpochSecond(Long.valueOf(input)).atZone(ZoneOffset.UTC);
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DateTimeParseException("invalid number [" + input + "]", input, 0, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String format(TemporalAccessor accessor) {
|
||||
Instant instant = Instant.from(accessor);
|
||||
if (instant.getNano() != 0) {
|
||||
return String.valueOf(instant.getEpochSecond()) + "." + String.valueOf(instant.getNano()).replaceAll("0*$", "");
|
||||
}
|
||||
return String.valueOf(instant.getEpochSecond());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pattern() {
|
||||
return "epoch_second";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Locale locale() {
|
||||
return Locale.ROOT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZoneId zone() {
|
||||
return ZoneOffset.UTC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateMathParser toDateMathParser() {
|
||||
return DATE_MATH_INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateFormatter withZone(ZoneId zoneId) {
|
||||
if (zoneId.equals(ZoneOffset.UTC) == false) {
|
||||
throw new IllegalArgumentException(pattern() + " date formatter can only be in zone offset UTC");
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateFormatter withLocale(Locale locale) {
|
||||
if (Locale.ROOT.equals(locale) == false) {
|
||||
throw new IllegalArgumentException(pattern() + " date formatter can only be in locale ROOT");
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,219 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.time;
|
||||
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeFormatterBuilder;
|
||||
import java.time.format.ResolverStyle;
|
||||
import java.time.format.SignStyle;
|
||||
import java.time.temporal.ChronoField;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.Temporal;
|
||||
import java.time.temporal.TemporalAccessor;
|
||||
import java.time.temporal.TemporalField;
|
||||
import java.time.temporal.TemporalUnit;
|
||||
import java.time.temporal.ValueRange;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class provides {@link DateTimeFormatter}s capable of parsing epoch seconds and milliseconds.
|
||||
* <p>
|
||||
* The seconds formatter is provided by {@link #SECONDS_FORMATTER}.
|
||||
* The milliseconds formatter is provided by {@link #MILLIS_FORMATTER}.
|
||||
* <p>
|
||||
* Both formatters support fractional time, up to nanosecond precision. Values must be positive numbers.
|
||||
*/
|
||||
class EpochTime {
|
||||
|
||||
private static final ValueRange LONG_POSITIVE_RANGE = ValueRange.of(0, Long.MAX_VALUE);
|
||||
|
||||
private static final EpochField SECONDS = new EpochField(ChronoUnit.SECONDS, ChronoUnit.FOREVER, LONG_POSITIVE_RANGE) {
|
||||
@Override
|
||||
public boolean isSupportedBy(TemporalAccessor temporal) {
|
||||
return temporal.isSupported(ChronoField.INSTANT_SECONDS);
|
||||
}
|
||||
@Override
|
||||
public long getFrom(TemporalAccessor temporal) {
|
||||
return temporal.getLong(ChronoField.INSTANT_SECONDS);
|
||||
}
|
||||
@Override
|
||||
public TemporalAccessor resolve(Map<TemporalField,Long> fieldValues,
|
||||
TemporalAccessor partialTemporal, ResolverStyle resolverStyle) {
|
||||
long seconds = fieldValues.remove(this);
|
||||
fieldValues.put(ChronoField.INSTANT_SECONDS, seconds);
|
||||
Long nanos = fieldValues.remove(NANOS_OF_SECOND);
|
||||
if (nanos != null) {
|
||||
fieldValues.put(ChronoField.NANO_OF_SECOND, nanos);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
private static final EpochField NANOS_OF_SECOND = new EpochField(ChronoUnit.NANOS, ChronoUnit.SECONDS, ValueRange.of(0, 999_999_999)) {
|
||||
@Override
|
||||
public boolean isSupportedBy(TemporalAccessor temporal) {
|
||||
return temporal.isSupported(ChronoField.NANO_OF_SECOND) && temporal.getLong(ChronoField.NANO_OF_SECOND) != 0;
|
||||
}
|
||||
@Override
|
||||
public long getFrom(TemporalAccessor temporal) {
|
||||
return temporal.getLong(ChronoField.NANO_OF_SECOND);
|
||||
}
|
||||
};
|
||||
|
||||
private static final EpochField MILLIS = new EpochField(ChronoUnit.MILLIS, ChronoUnit.FOREVER, LONG_POSITIVE_RANGE) {
|
||||
@Override
|
||||
public boolean isSupportedBy(TemporalAccessor temporal) {
|
||||
return temporal.isSupported(ChronoField.INSTANT_SECONDS) && temporal.isSupported(ChronoField.MILLI_OF_SECOND);
|
||||
}
|
||||
@Override
|
||||
public long getFrom(TemporalAccessor temporal) {
|
||||
return temporal.getLong(ChronoField.INSTANT_SECONDS) * 1_000 + temporal.getLong(ChronoField.MILLI_OF_SECOND);
|
||||
}
|
||||
@Override
|
||||
public TemporalAccessor resolve(Map<TemporalField,Long> fieldValues,
|
||||
TemporalAccessor partialTemporal, ResolverStyle resolverStyle) {
|
||||
long secondsAndMillis = fieldValues.remove(this);
|
||||
long seconds = secondsAndMillis / 1_000;
|
||||
long nanos = secondsAndMillis % 1000 * 1_000_000;
|
||||
Long nanosOfMilli = fieldValues.remove(NANOS_OF_MILLI);
|
||||
if (nanosOfMilli != null) {
|
||||
nanos += nanosOfMilli;
|
||||
}
|
||||
fieldValues.put(ChronoField.INSTANT_SECONDS, seconds);
|
||||
fieldValues.put(ChronoField.NANO_OF_SECOND, nanos);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
private static final EpochField NANOS_OF_MILLI = new EpochField(ChronoUnit.NANOS, ChronoUnit.MILLIS, ValueRange.of(0, 999_999)) {
|
||||
@Override
|
||||
public boolean isSupportedBy(TemporalAccessor temporal) {
|
||||
return temporal.isSupported(ChronoField.NANO_OF_SECOND) && temporal.getLong(ChronoField.NANO_OF_SECOND) % 1_000_000 != 0;
|
||||
}
|
||||
@Override
|
||||
public long getFrom(TemporalAccessor temporal) {
|
||||
return temporal.getLong(ChronoField.NANO_OF_SECOND);
|
||||
}
|
||||
};
|
||||
|
||||
// this supports seconds without any fraction
|
||||
private static final DateTimeFormatter SECONDS_FORMATTER1 = new DateTimeFormatterBuilder()
|
||||
.appendValue(SECONDS, 1, 19, SignStyle.NORMAL)
|
||||
.toFormatter(Locale.ROOT);
|
||||
|
||||
// this supports seconds ending in dot
|
||||
private static final DateTimeFormatter SECONDS_FORMATTER2 = new DateTimeFormatterBuilder()
|
||||
.append(SECONDS_FORMATTER1)
|
||||
.appendLiteral('.')
|
||||
.toFormatter(Locale.ROOT);
|
||||
|
||||
// this supports seconds with a fraction and is also used for printing
|
||||
private static final DateTimeFormatter SECONDS_FORMATTER3 = new DateTimeFormatterBuilder()
|
||||
.append(SECONDS_FORMATTER1)
|
||||
.optionalStart() // optional is used so isSupported will be called when printing
|
||||
.appendFraction(NANOS_OF_SECOND, 1, 9, true)
|
||||
.optionalEnd()
|
||||
.toFormatter(Locale.ROOT);
|
||||
|
||||
// this supports milliseconds without any fraction
|
||||
private static final DateTimeFormatter MILLISECONDS_FORMATTER1 = new DateTimeFormatterBuilder()
|
||||
.appendValue(MILLIS, 1, 19, SignStyle.NORMAL)
|
||||
.toFormatter(Locale.ROOT);
|
||||
|
||||
// this supports milliseconds ending in dot
|
||||
private static final DateTimeFormatter MILLISECONDS_FORMATTER2 = new DateTimeFormatterBuilder()
|
||||
.append(MILLISECONDS_FORMATTER1)
|
||||
.appendLiteral('.')
|
||||
.toFormatter(Locale.ROOT);
|
||||
|
||||
// this supports milliseconds with a fraction and is also used for printing
|
||||
private static final DateTimeFormatter MILLISECONDS_FORMATTER3 = new DateTimeFormatterBuilder()
|
||||
.append(MILLISECONDS_FORMATTER1)
|
||||
.optionalStart() // optional is used so isSupported will be called when printing
|
||||
.appendFraction(NANOS_OF_MILLI, 1, 6, true)
|
||||
.optionalEnd()
|
||||
.toFormatter(Locale.ROOT);
|
||||
|
||||
static final DateFormatter SECONDS_FORMATTER = new JavaDateFormatter("epoch_second", SECONDS_FORMATTER3,
|
||||
SECONDS_FORMATTER1, SECONDS_FORMATTER2, SECONDS_FORMATTER3);
|
||||
|
||||
static final DateFormatter MILLIS_FORMATTER = new JavaDateFormatter("epoch_millis", MILLISECONDS_FORMATTER3,
|
||||
MILLISECONDS_FORMATTER1, MILLISECONDS_FORMATTER2, MILLISECONDS_FORMATTER3);
|
||||
|
||||
private abstract static class EpochField implements TemporalField {
|
||||
|
||||
private final TemporalUnit baseUnit;
|
||||
private final TemporalUnit rangeUnit;
|
||||
private final ValueRange range;
|
||||
|
||||
private EpochField(TemporalUnit baseUnit, TemporalUnit rangeUnit, ValueRange range) {
|
||||
this.baseUnit = baseUnit;
|
||||
this.rangeUnit = rangeUnit;
|
||||
this.range = range;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDisplayName(Locale locale) {
|
||||
return toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Epoch" + baseUnit.toString() + (rangeUnit != ChronoUnit.FOREVER ? "Of" + rangeUnit.toString() : "");
|
||||
}
|
||||
|
||||
@Override
|
||||
public TemporalUnit getBaseUnit() {
|
||||
return baseUnit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TemporalUnit getRangeUnit() {
|
||||
return rangeUnit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueRange range() {
|
||||
return range;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDateBased() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTimeBased() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueRange rangeRefinedBy(TemporalAccessor temporal) {
|
||||
return range();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <R extends Temporal> R adjustInto(R temporal, long newValue) {
|
||||
return (R) temporal.with(this, newValue);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -256,6 +256,17 @@ public final class IndexSettings {
|
|||
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0,
|
||||
Property.IndexScope, Property.Dynamic);
|
||||
|
||||
/**
|
||||
* Controls the maximum length of time since a retention lease is created or renewed before it is considered expired.
|
||||
*/
|
||||
public static final Setting<TimeValue> INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING =
|
||||
Setting.timeSetting(
|
||||
"index.soft_deletes.retention.lease",
|
||||
TimeValue.timeValueHours(12),
|
||||
TimeValue.ZERO,
|
||||
Property.Dynamic,
|
||||
Property.IndexScope);
|
||||
|
||||
/**
|
||||
* The maximum number of refresh listeners allows on this shard.
|
||||
*/
|
||||
|
@ -316,6 +327,18 @@ public final class IndexSettings {
|
|||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
|
||||
private final boolean softDeleteEnabled;
|
||||
private volatile long softDeleteRetentionOperations;
|
||||
|
||||
private volatile long retentionLeaseMillis;
|
||||
|
||||
/**
|
||||
* The maximum age of a retention lease before it is considered expired.
|
||||
*
|
||||
* @return the maximum age
|
||||
*/
|
||||
public long getRetentionLeaseMillis() {
|
||||
return retentionLeaseMillis;
|
||||
}
|
||||
|
||||
private volatile boolean warmerEnabled;
|
||||
private volatile int maxResultWindow;
|
||||
private volatile int maxInnerResultWindow;
|
||||
|
@ -431,6 +454,7 @@ public final class IndexSettings {
|
|||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
|
||||
softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
|
||||
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
|
||||
retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING).millis();
|
||||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
|
||||
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
|
||||
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.index.shard.ReplicationGroup;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -137,6 +136,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
*/
|
||||
private final LongConsumer onGlobalCheckpointUpdated;
|
||||
|
||||
/**
|
||||
* A supplier of the current time. This supplier is used to add a timestamp to retention leases, and to determine retention lease
|
||||
* expiration.
|
||||
*/
|
||||
private final LongSupplier currentTimeMillisSupplier;
|
||||
|
||||
/**
|
||||
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
|
||||
* current global checkpoint.
|
||||
|
@ -151,12 +156,21 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
|
||||
* Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
|
||||
*
|
||||
* @return the retention leases
|
||||
*/
|
||||
public synchronized Collection<RetentionLease> getRetentionLeases() {
|
||||
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
|
||||
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
|
||||
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
|
||||
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
|
||||
.values()
|
||||
.stream()
|
||||
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
|
||||
.collect(Collectors.toList());
|
||||
retentionLeases.clear();
|
||||
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
|
||||
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -168,7 +182,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
*/
|
||||
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
|
||||
assert primaryMode;
|
||||
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source));
|
||||
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
|
||||
}
|
||||
|
||||
public static class CheckpointState implements Writeable {
|
||||
|
@ -425,7 +439,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
final String allocationId,
|
||||
final IndexSettings indexSettings,
|
||||
final long globalCheckpoint,
|
||||
final LongConsumer onGlobalCheckpointUpdated) {
|
||||
final LongConsumer onGlobalCheckpointUpdated,
|
||||
final LongSupplier currentTimeMillisSupplier) {
|
||||
super(shardId, indexSettings);
|
||||
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
|
||||
this.shardAllocationId = allocationId;
|
||||
|
@ -435,6 +450,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
|
||||
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
|
||||
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
|
||||
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
|
||||
this.pendingInSync = new HashSet<>();
|
||||
this.routingTable = null;
|
||||
this.replicationGroup = null;
|
||||
|
|
|
@ -23,9 +23,9 @@ package org.elasticsearch.index.seqno;
|
|||
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
|
||||
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
|
||||
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
|
||||
* number, and the source of the retention lease (e.g., "ccr").
|
||||
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
|
||||
*/
|
||||
public class RetentionLease {
|
||||
public final class RetentionLease {
|
||||
|
||||
private final String id;
|
||||
|
||||
|
@ -50,6 +50,17 @@ public class RetentionLease {
|
|||
return retainingSequenceNumber;
|
||||
}
|
||||
|
||||
private final long timestamp;
|
||||
|
||||
/**
|
||||
* The timestamp of when this retention lease was created or renewed.
|
||||
*
|
||||
* @return the timestamp used as a basis for determining lease expiration
|
||||
*/
|
||||
public long timestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
private final String source;
|
||||
|
||||
/**
|
||||
|
@ -66,19 +77,22 @@ public class RetentionLease {
|
|||
*
|
||||
* @param id the identifier of the retention lease
|
||||
* @param retainingSequenceNumber the retaining sequence number
|
||||
* @param timestamp the timestamp of when the retention lease was created or renewed
|
||||
* @param source the source of the retention lease
|
||||
*/
|
||||
public RetentionLease(final String id, final long retainingSequenceNumber, final String source) {
|
||||
public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) {
|
||||
this.id = id;
|
||||
this.retainingSequenceNumber = retainingSequenceNumber;
|
||||
this.timestamp = timestamp;
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ShardHistoryRetentionLease{" +
|
||||
return "RetentionLease{" +
|
||||
"id='" + id + '\'' +
|
||||
", retainingSequenceNumber=" + retainingSequenceNumber +
|
||||
", timestamp=" + timestamp +
|
||||
", source='" + source + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
|
|
@ -305,7 +305,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
this.globalCheckpointListeners =
|
||||
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
|
||||
this.replicationTracker =
|
||||
new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated);
|
||||
new ReplicationTracker(
|
||||
shardId,
|
||||
aId,
|
||||
indexSettings,
|
||||
UNASSIGNED_SEQ_NO,
|
||||
globalCheckpointListeners::globalCheckpointUpdated,
|
||||
threadPool::absoluteTimeInMillis);
|
||||
|
||||
// the query cache is a node-level thing, however we want the most popular filters
|
||||
// to be computed on a per-shard basis
|
||||
|
|
|
@ -174,13 +174,13 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
|
||||
"cluster.remote.",
|
||||
"transport.ping_schedule",
|
||||
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.NodeScope),
|
||||
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
|
||||
REMOTE_CLUSTERS_SEEDS);
|
||||
|
||||
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
|
||||
"cluster.remote.",
|
||||
"transport.compress",
|
||||
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.NodeScope),
|
||||
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
|
||||
REMOTE_CLUSTERS_SEEDS);
|
||||
|
||||
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
|
||||
|
|
|
@ -143,7 +143,7 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|||
assertFalse(metadata.sourceAsMap().isEmpty());
|
||||
}
|
||||
|
||||
public void testNonNestedEmptyMappings() throws Exception {
|
||||
public void testEmptyNestedMappings() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.addMapping("_doc", XContentFactory.jsonBuilder().startObject().endObject()));
|
||||
|
||||
|
@ -173,20 +173,6 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|||
assertTrue(metadata.sourceAsMap().isEmpty());
|
||||
}
|
||||
|
||||
public void testFlatMappingFormat() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.addMapping("_doc", "field", "type=keyword"));
|
||||
|
||||
GetMappingsResponse response = client().admin().indices().prepareGetMappings("test").get();
|
||||
|
||||
ImmutableOpenMap<String, MappingMetaData> mappings = response.mappings().get("test");
|
||||
assertNotNull(mappings);
|
||||
|
||||
MappingMetaData metadata = mappings.get("_doc");
|
||||
assertNotNull(metadata);
|
||||
assertFalse(metadata.sourceAsMap().isEmpty());
|
||||
}
|
||||
|
||||
public void testInvalidShardCountSettings() throws Exception {
|
||||
int value = randomIntBetween(-10, 0);
|
||||
try {
|
||||
|
|
|
@ -22,86 +22,31 @@ package org.elasticsearch.action.admin.indices.create;
|
|||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.RandomCreateIndexGenerator;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class CreateIndexRequestTests extends AbstractXContentTestCase<CreateIndexRequest> {
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
|
||||
|
||||
@Override
|
||||
protected CreateIndexRequest createTestInstance() {
|
||||
try {
|
||||
return RandomCreateIndexGenerator.randomCreateIndexRequest();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CreateIndexRequest doParseInstance(XContentParser parser) throws IOException {
|
||||
CreateIndexRequest request = new CreateIndexRequest();
|
||||
request.source(parser.map(), LoggingDeprecationHandler.INSTANCE);
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(CreateIndexRequest expectedInstance, CreateIndexRequest newInstance) {
|
||||
assertEquals(expectedInstance.settings(), newInstance.settings());
|
||||
assertAliasesEqual(expectedInstance.aliases(), newInstance.aliases());
|
||||
assertMappingsEqual(expectedInstance.mappings(), newInstance.mappings());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static void assertMappingsEqual(Map<String, String> expected, Map<String, String> actual) {
|
||||
assertEquals(expected.keySet(), actual.keySet());
|
||||
|
||||
for (Map.Entry<String, String> expectedEntry : expected.entrySet()) {
|
||||
String expectedValue = expectedEntry.getValue();
|
||||
String actualValue = actual.get(expectedEntry.getKey());
|
||||
try (XContentParser expectedJson = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, expectedValue);
|
||||
XContentParser actualJson = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, actualValue)) {
|
||||
assertEquals(expectedJson.map(), actualJson.map());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertAliasesEqual(Set<Alias> expected, Set<Alias> actual) {
|
||||
assertEquals(expected, actual);
|
||||
|
||||
for (Alias expectedAlias : expected) {
|
||||
for (Alias actualAlias : actual) {
|
||||
if (expectedAlias.equals(actualAlias)) {
|
||||
// As Alias#equals only looks at name, we check the equality of the other Alias parameters here.
|
||||
assertEquals(expectedAlias.filter(), actualAlias.filter());
|
||||
assertEquals(expectedAlias.indexRouting(), actualAlias.indexRouting());
|
||||
assertEquals(expectedAlias.searchRouting(), actualAlias.searchRouting());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
public class CreateIndexRequestTests extends ESTestCase {
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
CreateIndexRequest request = new CreateIndexRequest("foo");
|
||||
String mapping = Strings.toString(JsonXContent.contentBuilder().startObject()
|
||||
.startObject("type").endObject().endObject());
|
||||
String mapping = Strings.toString(JsonXContent.contentBuilder().startObject().startObject("type").endObject().endObject());
|
||||
request.mapping("my_type", mapping, XContentType.JSON);
|
||||
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
|
@ -118,7 +63,7 @@ public class CreateIndexRequestTests extends AbstractXContentTestCase<CreateInde
|
|||
|
||||
public void testTopLevelKeys() {
|
||||
String createIndex =
|
||||
"{\n"
|
||||
"{\n"
|
||||
+ " \"FOO_SHOULD_BE_ILLEGAL_HERE\": {\n"
|
||||
+ " \"BAR_IS_THE_SAME\": 42\n"
|
||||
+ " },\n"
|
||||
|
@ -135,7 +80,81 @@ public class CreateIndexRequestTests extends AbstractXContentTestCase<CreateInde
|
|||
|
||||
CreateIndexRequest request = new CreateIndexRequest();
|
||||
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
|
||||
() -> {request.source(createIndex, XContentType.JSON);});
|
||||
() -> {request.source(createIndex, XContentType.JSON);});
|
||||
assertEquals("unknown key [FOO_SHOULD_BE_ILLEGAL_HERE] for create index", e.getMessage());
|
||||
}
|
||||
|
||||
public void testToXContent() throws IOException {
|
||||
CreateIndexRequest request = new CreateIndexRequest("foo");
|
||||
|
||||
String mapping = Strings.toString(JsonXContent.contentBuilder().startObject().startObject("type").endObject().endObject());
|
||||
request.mapping("my_type", mapping, XContentType.JSON);
|
||||
|
||||
Alias alias = new Alias("test_alias");
|
||||
alias.routing("1");
|
||||
alias.filter("{\"term\":{\"year\":2016}}");
|
||||
alias.writeIndex(true);
|
||||
request.alias(alias);
|
||||
|
||||
Settings.Builder settings = Settings.builder();
|
||||
settings.put(SETTING_NUMBER_OF_SHARDS, 10);
|
||||
request.settings(settings);
|
||||
|
||||
String actualRequestBody = Strings.toString(request);
|
||||
|
||||
String expectedRequestBody = "{\"settings\":{\"index\":{\"number_of_shards\":\"10\"}}," +
|
||||
"\"mappings\":{\"my_type\":{\"type\":{}}}," +
|
||||
"\"aliases\":{\"test_alias\":{\"filter\":{\"term\":{\"year\":2016}},\"routing\":\"1\",\"is_write_index\":true}}}";
|
||||
|
||||
assertEquals(expectedRequestBody, actualRequestBody);
|
||||
}
|
||||
|
||||
public void testToAndFromXContent() throws IOException {
|
||||
|
||||
final CreateIndexRequest createIndexRequest = RandomCreateIndexGenerator.randomCreateIndexRequest();
|
||||
|
||||
boolean humanReadable = randomBoolean();
|
||||
final XContentType xContentType = randomFrom(XContentType.values());
|
||||
BytesReference originalBytes = toShuffledXContent(createIndexRequest, xContentType, EMPTY_PARAMS, humanReadable);
|
||||
|
||||
CreateIndexRequest parsedCreateIndexRequest = new CreateIndexRequest();
|
||||
parsedCreateIndexRequest.source(originalBytes, xContentType);
|
||||
|
||||
assertMappingsEqual(createIndexRequest.mappings(), parsedCreateIndexRequest.mappings());
|
||||
assertAliasesEqual(createIndexRequest.aliases(), parsedCreateIndexRequest.aliases());
|
||||
assertEquals(createIndexRequest.settings(), parsedCreateIndexRequest.settings());
|
||||
|
||||
BytesReference finalBytes = toShuffledXContent(parsedCreateIndexRequest, xContentType, EMPTY_PARAMS, humanReadable);
|
||||
ElasticsearchAssertions.assertToXContentEquivalent(originalBytes, finalBytes, xContentType);
|
||||
}
|
||||
|
||||
public static void assertMappingsEqual(Map<String, String> expected, Map<String, String> actual) throws IOException {
|
||||
assertEquals(expected.keySet(), actual.keySet());
|
||||
|
||||
for (Map.Entry<String, String> expectedEntry : expected.entrySet()) {
|
||||
String expectedValue = expectedEntry.getValue();
|
||||
String actualValue = actual.get(expectedEntry.getKey());
|
||||
try (XContentParser expectedJson = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, expectedValue);
|
||||
XContentParser actualJson = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, actualValue)){
|
||||
assertEquals(expectedJson.map(), actualJson.map());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertAliasesEqual(Set<Alias> expected, Set<Alias> actual) throws IOException {
|
||||
assertEquals(expected, actual);
|
||||
|
||||
for (Alias expectedAlias : expected) {
|
||||
for (Alias actualAlias : actual) {
|
||||
if (expectedAlias.equals(actualAlias)) {
|
||||
// As Alias#equals only looks at name, we check the equality of the other Alias parameters here.
|
||||
assertEquals(expectedAlias.filter(), actualAlias.filter());
|
||||
assertEquals(expectedAlias.indexRouting(), actualAlias.indexRouting());
|
||||
assertEquals(expectedAlias.searchRouting(), actualAlias.searchRouting());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,8 +76,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
|
|||
SearchRequest searchRequest = createSearchRequest();
|
||||
Version version = VersionUtils.randomVersion(random());
|
||||
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version);
|
||||
//TODO update version after backport
|
||||
if (version.before(Version.V_7_0_0)) {
|
||||
if (version.before(Version.V_6_7_0)) {
|
||||
assertNull(deserializedRequest.getLocalClusterAlias());
|
||||
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
|
||||
} else {
|
||||
|
@ -86,11 +85,10 @@ public class SearchRequestTests extends AbstractSearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
//TODO rename and update version after backport
|
||||
public void testReadFromPre7_0_0() throws IOException {
|
||||
public void testReadFromPre6_7_0() throws IOException {
|
||||
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
|
||||
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
|
||||
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
|
||||
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_6_7_0)));
|
||||
SearchRequest searchRequest = new SearchRequest(in);
|
||||
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
|
||||
assertNull(searchRequest.getLocalClusterAlias());
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.common.joda;
|
||||
|
||||
import org.elasticsearch.bootstrap.JavaVersion;
|
||||
import org.elasticsearch.common.time.DateFormatter;
|
||||
import org.elasticsearch.common.time.DateFormatters;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -384,6 +385,7 @@ public class JavaJodaTimeDuellingTests extends ESTestCase {
|
|||
|
||||
ZonedDateTime javaDate = ZonedDateTime.of(year, month, day, hour, minute, second, 0, ZoneOffset.UTC);
|
||||
DateTime jodaDate = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC);
|
||||
assertSamePrinterOutput("epoch_second", javaDate, jodaDate);
|
||||
|
||||
assertSamePrinterOutput("basicDate", javaDate, jodaDate);
|
||||
assertSamePrinterOutput("basicDateTime", javaDate, jodaDate);
|
||||
|
@ -428,7 +430,7 @@ public class JavaJodaTimeDuellingTests extends ESTestCase {
|
|||
assertSamePrinterOutput("year", javaDate, jodaDate);
|
||||
assertSamePrinterOutput("yearMonth", javaDate, jodaDate);
|
||||
assertSamePrinterOutput("yearMonthDay", javaDate, jodaDate);
|
||||
assertSamePrinterOutput("epoch_second", javaDate, jodaDate);
|
||||
|
||||
assertSamePrinterOutput("epoch_millis", javaDate, jodaDate);
|
||||
assertSamePrinterOutput("strictBasicWeekDate", javaDate, jodaDate);
|
||||
assertSamePrinterOutput("strictBasicWeekDateTime", javaDate, jodaDate);
|
||||
|
@ -476,6 +478,12 @@ public class JavaJodaTimeDuellingTests extends ESTestCase {
|
|||
assertThat(jodaDate.getMillis(), is(javaDate.toInstant().toEpochMilli()));
|
||||
String javaTimeOut = DateFormatters.forPattern(format).format(javaDate);
|
||||
String jodaTimeOut = DateFormatter.forPattern(format).formatJoda(jodaDate);
|
||||
if (JavaVersion.current().getVersion().get(0) == 8 && javaTimeOut.endsWith(".0")
|
||||
&& (format.equals("epoch_second") || format.equals("epoch_millis"))) {
|
||||
// java 8 has a bug in DateTimeFormatter usage when printing dates that rely on isSupportedBy for fields, which is
|
||||
// what we use for epoch time. This change accounts for that bug. It should be removed when java 8 support is removed
|
||||
jodaTimeOut += ".0";
|
||||
}
|
||||
String message = String.format(Locale.ROOT, "expected string representation to be equal for format [%s]: joda [%s], java [%s]",
|
||||
format, jodaTimeOut, javaTimeOut);
|
||||
assertThat(message, javaTimeOut, is(jodaTimeOut));
|
||||
|
@ -484,7 +492,6 @@ public class JavaJodaTimeDuellingTests extends ESTestCase {
|
|||
private void assertSameDate(String input, String format) {
|
||||
DateFormatter jodaFormatter = Joda.forPattern(format);
|
||||
DateFormatter javaFormatter = DateFormatters.forPattern(format);
|
||||
|
||||
assertSameDate(input, format, jodaFormatter, javaFormatter);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.time.ZoneOffset;
|
|||
|
||||
public class JodaTests extends ESTestCase {
|
||||
|
||||
|
||||
public void testBasicTTimePattern() {
|
||||
DateFormatter formatter1 = DateFormatter.forPattern("basic_t_time");
|
||||
assertEquals(formatter1.pattern(), "basic_t_time");
|
||||
|
|
|
@ -47,6 +47,7 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
|
@ -744,4 +745,18 @@ public class SettingsTests extends ESTestCase {
|
|||
assertThat(actual, equalTo(expected));
|
||||
}
|
||||
|
||||
public void testSetByTimeUnit() {
|
||||
final Setting<TimeValue> setting =
|
||||
Setting.timeSetting("key", TimeValue.parseTimeValue(randomTimeValue(0, 24, "h"), "key"), TimeValue.ZERO);
|
||||
final TimeValue expected = new TimeValue(1500, TimeUnit.MICROSECONDS);
|
||||
final Settings settings = Settings.builder().put("key", expected.getMicros(), TimeUnit.MICROSECONDS).build();
|
||||
/*
|
||||
* Previously we would internally convert the duration to a string by converting to milliseconds which could lose precision (e.g.,
|
||||
* 1500 microseconds would be converted to 1ms). Effectively this test is then asserting that we no longer make this mistake when
|
||||
* doing the internal string conversion. Instead, we convert to a duration using a method that does not lose the original unit.
|
||||
*/
|
||||
final TimeValue actual = setting.get(settings);
|
||||
assertThat(actual, equalTo(expected));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.time.temporal.TemporalAccessor;
|
||||
import java.util.Locale;
|
||||
|
@ -58,21 +57,6 @@ public class DateFormattersTests extends ESTestCase {
|
|||
assertThat(instant.getEpochSecond(), is(12L));
|
||||
assertThat(instant.getNano(), is(345_000_000));
|
||||
}
|
||||
{
|
||||
Instant instant = Instant.from(formatter.parse("-12345.6789"));
|
||||
assertThat(instant.getEpochSecond(), is(-13L));
|
||||
assertThat(instant.getNano(), is(1_000_000_000 - 345_678_900));
|
||||
}
|
||||
{
|
||||
Instant instant = Instant.from(formatter.parse("-436134.241272"));
|
||||
assertThat(instant.getEpochSecond(), is(-437L));
|
||||
assertThat(instant.getNano(), is(1_000_000_000 - 134_241_272));
|
||||
}
|
||||
{
|
||||
Instant instant = Instant.from(formatter.parse("-12345"));
|
||||
assertThat(instant.getEpochSecond(), is(-13L));
|
||||
assertThat(instant.getNano(), is(1_000_000_000 - 345_000_000));
|
||||
}
|
||||
{
|
||||
Instant instant = Instant.from(formatter.parse("0"));
|
||||
assertThat(instant.getEpochSecond(), is(0L));
|
||||
|
@ -83,10 +67,10 @@ public class DateFormattersTests extends ESTestCase {
|
|||
public void testEpochMilliParser() {
|
||||
DateFormatter formatter = DateFormatters.forPattern("epoch_millis");
|
||||
DateTimeParseException e = expectThrows(DateTimeParseException.class, () -> formatter.parse("invalid"));
|
||||
assertThat(e.getMessage(), containsString("invalid number"));
|
||||
assertThat(e.getMessage(), containsString("could not be parsed"));
|
||||
|
||||
e = expectThrows(DateTimeParseException.class, () -> formatter.parse("123.1234567"));
|
||||
assertThat(e.getMessage(), containsString("too much granularity after dot [123.1234567]"));
|
||||
assertThat(e.getMessage(), containsString("unparsed text found at index 3"));
|
||||
}
|
||||
|
||||
// this is not in the duelling tests, because the epoch second parser in joda time drops the milliseconds after the comma
|
||||
|
@ -108,17 +92,14 @@ public class DateFormattersTests extends ESTestCase {
|
|||
assertThat(Instant.from(formatter.parse("1234.12345678")).getNano(), is(123_456_780));
|
||||
assertThat(Instant.from(formatter.parse("1234.123456789")).getNano(), is(123_456_789));
|
||||
|
||||
assertThat(Instant.from(formatter.parse("-1234.567")).toEpochMilli(), is(-1234567L));
|
||||
assertThat(Instant.from(formatter.parse("-1234")).getNano(), is(0));
|
||||
|
||||
DateTimeParseException e = expectThrows(DateTimeParseException.class, () -> formatter.parse("1234.1234567890"));
|
||||
assertThat(e.getMessage(), is("too much granularity after dot [1234.1234567890]"));
|
||||
assertThat(e.getMessage(), is("Text '1234.1234567890' could not be parsed, unparsed text found at index 4"));
|
||||
e = expectThrows(DateTimeParseException.class, () -> formatter.parse("1234.123456789013221"));
|
||||
assertThat(e.getMessage(), is("too much granularity after dot [1234.123456789013221]"));
|
||||
assertThat(e.getMessage(), is("Text '1234.123456789013221' could not be parsed, unparsed text found at index 4"));
|
||||
e = expectThrows(DateTimeParseException.class, () -> formatter.parse("abc"));
|
||||
assertThat(e.getMessage(), is("invalid number [abc]"));
|
||||
assertThat(e.getMessage(), is("Text 'abc' could not be parsed at index 0"));
|
||||
e = expectThrows(DateTimeParseException.class, () -> formatter.parse("1234.abc"));
|
||||
assertThat(e.getMessage(), is("invalid number [1234.abc]"));
|
||||
assertThat(e.getMessage(), is("Text '1234.abc' could not be parsed, unparsed text found at index 4"));
|
||||
}
|
||||
|
||||
public void testEpochMilliParsersWithDifferentFormatters() {
|
||||
|
@ -132,18 +113,6 @@ public class DateFormattersTests extends ESTestCase {
|
|||
assertThat(DateFormatters.forPattern("strict_date_optional_time").locale(), is(Locale.ROOT));
|
||||
Locale locale = randomLocale(random());
|
||||
assertThat(DateFormatters.forPattern("strict_date_optional_time").withLocale(locale).locale(), is(locale));
|
||||
if (locale.equals(Locale.ROOT)) {
|
||||
DateFormatter millisFormatter = DateFormatters.forPattern("epoch_millis");
|
||||
assertThat(millisFormatter.withLocale(locale), is(millisFormatter));
|
||||
DateFormatter secondFormatter = DateFormatters.forPattern("epoch_second");
|
||||
assertThat(secondFormatter.withLocale(locale), is(secondFormatter));
|
||||
} else {
|
||||
IllegalArgumentException e =
|
||||
expectThrows(IllegalArgumentException.class, () -> DateFormatters.forPattern("epoch_millis").withLocale(locale));
|
||||
assertThat(e.getMessage(), is("epoch_millis date formatter can only be in locale ROOT"));
|
||||
e = expectThrows(IllegalArgumentException.class, () -> DateFormatters.forPattern("epoch_second").withLocale(locale));
|
||||
assertThat(e.getMessage(), is("epoch_second date formatter can only be in locale ROOT"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testTimeZones() {
|
||||
|
@ -151,18 +120,6 @@ public class DateFormattersTests extends ESTestCase {
|
|||
assertThat(DateFormatters.forPattern("strict_date_optional_time").zone(), is(nullValue()));
|
||||
ZoneId zoneId = randomZone();
|
||||
assertThat(DateFormatters.forPattern("strict_date_optional_time").withZone(zoneId).zone(), is(zoneId));
|
||||
if (zoneId.equals(ZoneOffset.UTC)) {
|
||||
DateFormatter millisFormatter = DateFormatters.forPattern("epoch_millis");
|
||||
assertThat(millisFormatter.withZone(zoneId), is(millisFormatter));
|
||||
DateFormatter secondFormatter = DateFormatters.forPattern("epoch_second");
|
||||
assertThat(secondFormatter.withZone(zoneId), is(secondFormatter));
|
||||
} else {
|
||||
IllegalArgumentException e =
|
||||
expectThrows(IllegalArgumentException.class, () -> DateFormatters.forPattern("epoch_millis").withZone(zoneId));
|
||||
assertThat(e.getMessage(), is("epoch_millis date formatter can only be in zone offset UTC"));
|
||||
e = expectThrows(IllegalArgumentException.class, () -> DateFormatters.forPattern("epoch_second").withZone(zoneId));
|
||||
assertThat(e.getMessage(), is("epoch_second date formatter can only be in zone offset UTC"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testEqualsAndHashcode() {
|
||||
|
|
|
@ -55,7 +55,7 @@ public class SoftDeletesPolicyTests extends ESTestCase {
|
|||
() -> {
|
||||
final Set<RetentionLease> leases = new HashSet<>(retainingSequenceNumbers.length);
|
||||
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
|
||||
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), "test"));
|
||||
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), 0L, "test"));
|
||||
}
|
||||
return leases;
|
||||
};
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.index.seqno;
|
|||
|
||||
import org.elasticsearch.cluster.routing.AllocationId;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
|
||||
|
@ -28,11 +30,14 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase {
|
||||
|
||||
|
@ -43,7 +48,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
|
|||
id.getId(),
|
||||
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
|
||||
UNASSIGNED_SEQ_NO,
|
||||
value -> {});
|
||||
value -> {},
|
||||
() -> 0L);
|
||||
replicationTracker.updateFromMaster(
|
||||
randomNonNegativeLong(),
|
||||
Collections.singleton(id.getId()),
|
||||
|
@ -55,19 +61,73 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
|
|||
for (int i = 0; i < length; i++) {
|
||||
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
|
||||
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers);
|
||||
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
|
||||
}
|
||||
|
||||
for (int i = 0; i < length; i++) {
|
||||
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
|
||||
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
|
||||
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers);
|
||||
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testExpiration() {
|
||||
final AllocationId id = AllocationId.newInitializing();
|
||||
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
|
||||
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
|
||||
final Settings settings = Settings
|
||||
.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
|
||||
.build();
|
||||
final ReplicationTracker replicationTracker = new ReplicationTracker(
|
||||
new ShardId("test", "_na", 0),
|
||||
id.getId(),
|
||||
IndexSettingsModule.newIndexSettings("test", settings),
|
||||
UNASSIGNED_SEQ_NO,
|
||||
value -> {},
|
||||
currentTimeMillis::get);
|
||||
replicationTracker.updateFromMaster(
|
||||
randomNonNegativeLong(),
|
||||
Collections.singleton(id.getId()),
|
||||
routingTable(Collections.emptySet(), id),
|
||||
Collections.emptySet());
|
||||
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final long[] retainingSequenceNumbers = new long[1];
|
||||
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");
|
||||
|
||||
{
|
||||
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
|
||||
assertThat(retentionLeases, hasSize(1));
|
||||
final RetentionLease retentionLease = retentionLeases.iterator().next();
|
||||
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
|
||||
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
|
||||
}
|
||||
|
||||
// renew the lease
|
||||
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
|
||||
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
|
||||
replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");
|
||||
|
||||
{
|
||||
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
|
||||
assertThat(retentionLeases, hasSize(1));
|
||||
final RetentionLease retentionLease = retentionLeases.iterator().next();
|
||||
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
|
||||
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
|
||||
}
|
||||
|
||||
// now force the lease to expire
|
||||
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
|
||||
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
|
||||
}
|
||||
|
||||
private void assertRetentionLeases(
|
||||
final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers) {
|
||||
final ReplicationTracker replicationTracker,
|
||||
final int size,
|
||||
final long[] minimumRetainingSequenceNumbers,
|
||||
final LongSupplier currentTimeMillisSupplier) {
|
||||
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
|
||||
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
|
||||
for (final RetentionLease retentionLease : retentionLeases) {
|
||||
|
@ -79,6 +139,9 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
|
|||
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
|
||||
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
|
||||
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
|
||||
assertThat(
|
||||
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
|
||||
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
|
||||
assertThat(retentionLease.source(), equalTo("test-" + i));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,18 +31,23 @@ import org.elasticsearch.test.IndexSettingsModule;
|
|||
|
||||
import java.util.Set;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
|
||||
public abstract class ReplicationTrackerTestCase extends ESTestCase {
|
||||
|
||||
ReplicationTracker newTracker(final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint) {
|
||||
ReplicationTracker newTracker(
|
||||
final AllocationId allocationId,
|
||||
final LongConsumer updatedGlobalCheckpoint,
|
||||
final LongSupplier currentTimeMillisSupplier) {
|
||||
return new ReplicationTracker(
|
||||
new ShardId("test", "_na_", 0),
|
||||
allocationId.getId(),
|
||||
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
|
||||
UNASSIGNED_SEQ_NO,
|
||||
updatedGlobalCheckpoint);
|
||||
updatedGlobalCheckpoint,
|
||||
currentTimeMillisSupplier);
|
||||
}
|
||||
|
||||
static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final AllocationId primaryId) {
|
||||
|
|
|
@ -406,7 +406,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
|
|||
private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO);
|
||||
|
||||
private ReplicationTracker newTracker(final AllocationId allocationId) {
|
||||
return newTracker(allocationId, updatedGlobalCheckpoint::set);
|
||||
return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L);
|
||||
}
|
||||
|
||||
public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
|
||||
|
@ -683,10 +683,10 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
|
|||
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
|
||||
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
|
||||
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
|
||||
ReplicationTracker oldPrimary =
|
||||
new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate);
|
||||
ReplicationTracker newPrimary =
|
||||
new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate);
|
||||
ReplicationTracker oldPrimary = new ReplicationTracker(
|
||||
shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L);
|
||||
ReplicationTracker newPrimary = new ReplicationTracker(
|
||||
shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L);
|
||||
|
||||
Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));
|
||||
|
||||
|
|
|
@ -19,20 +19,50 @@
|
|||
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.seqno.RetentionLease;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
||||
|
||||
private final AtomicLong currentTimeMillis = new AtomicLong();
|
||||
|
||||
@Override
|
||||
protected ThreadPool setUpThreadPool() {
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
doAnswer(invocationOnMock -> currentTimeMillis.get()).when(threadPool).absoluteTimeInMillis();
|
||||
when(threadPool.executor(anyString())).thenReturn(mock(ExecutorService.class));
|
||||
when(threadPool.scheduler()).thenReturn(mock(ScheduledExecutorService.class));
|
||||
return threadPool;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDownThreadPool() {
|
||||
|
||||
}
|
||||
|
||||
public void testAddOrUpdateRetentionLease() throws IOException {
|
||||
final IndexShard indexShard = newStartedShard(true);
|
||||
try {
|
||||
|
@ -41,22 +71,67 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|||
for (int i = 0; i < length; i++) {
|
||||
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
|
||||
assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers);
|
||||
assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
|
||||
}
|
||||
|
||||
for (int i = 0; i < length; i++) {
|
||||
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
|
||||
indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
|
||||
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers);
|
||||
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L);
|
||||
}
|
||||
} finally {
|
||||
closeShards(indexShard);
|
||||
}
|
||||
}
|
||||
|
||||
public void testExpiration() throws IOException {
|
||||
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
|
||||
final Settings settings = Settings
|
||||
.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
|
||||
.build();
|
||||
// current time is mocked through the thread pool
|
||||
final IndexShard indexShard = newStartedShard(true, settings, new InternalEngineFactory());
|
||||
try {
|
||||
final long[] retainingSequenceNumbers = new long[1];
|
||||
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
indexShard.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");
|
||||
|
||||
{
|
||||
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
|
||||
assertThat(retentionLeases, hasSize(1));
|
||||
final RetentionLease retentionLease = retentionLeases.iterator().next();
|
||||
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
|
||||
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get);
|
||||
}
|
||||
|
||||
// renew the lease
|
||||
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
|
||||
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
|
||||
indexShard.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");
|
||||
|
||||
{
|
||||
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
|
||||
assertThat(retentionLeases, hasSize(1));
|
||||
final RetentionLease retentionLease = retentionLeases.iterator().next();
|
||||
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
|
||||
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get);
|
||||
}
|
||||
|
||||
// now force the lease to expire
|
||||
currentTimeMillis.set(
|
||||
currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
|
||||
assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get);
|
||||
} finally {
|
||||
closeShards(indexShard);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertRetentionLeases(
|
||||
final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers) {
|
||||
final IndexShard indexShard,
|
||||
final int size,
|
||||
final long[] minimumRetainingSequenceNumbers,
|
||||
final LongSupplier currentTimeMillisSupplier) {
|
||||
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
|
||||
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
|
||||
for (final RetentionLease retentionLease : retentionLeases) {
|
||||
|
@ -68,6 +143,9 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|||
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
|
||||
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
|
||||
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
|
||||
assertThat(
|
||||
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
|
||||
lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis()));
|
||||
assertThat(retentionLease.source(), equalTo("test-" + i));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.document.DocumentField;
|
||||
import org.elasticsearch.common.joda.Joda;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.time.DateFormatters;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -897,8 +898,9 @@ public class SearchFieldsIT extends ESIntegTestCase {
|
|||
assertThat(searchResponse.getHits().getAt(0).getFields().get("long_field").getValue(), equalTo("4.0"));
|
||||
assertThat(searchResponse.getHits().getAt(0).getFields().get("float_field").getValue(), equalTo("5.0"));
|
||||
assertThat(searchResponse.getHits().getAt(0).getFields().get("double_field").getValue(), equalTo("6.0"));
|
||||
// TODO: switch to java date formatter, but will require special casing java 8 as there is a bug with epoch formatting there
|
||||
assertThat(searchResponse.getHits().getAt(0).getFields().get("date_field").getValue(),
|
||||
equalTo(DateFormatters.forPattern("epoch_millis").format(date)));
|
||||
equalTo(Joda.forPattern("epoch_millis").format(date)));
|
||||
}
|
||||
|
||||
public void testScriptFields() throws Exception {
|
||||
|
|
|
@ -445,7 +445,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
logger.info("--> assert that old settings are restored");
|
||||
GetSettingsResponse getSettingsResponse = client.admin().indices().prepareGetSettings("test-idx").execute().actionGet();
|
||||
assertThat(getSettingsResponse.getSetting("test-idx", "index.refresh_interval"), equalTo("10000ms"));
|
||||
assertThat(getSettingsResponse.getSetting("test-idx", "index.refresh_interval"), equalTo("10s"));
|
||||
}
|
||||
|
||||
public void testEmptySnapshot() throws Exception {
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.io.IOException;
|
|||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.test.ESTestCase.frequently;
|
||||
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
|
||||
import static org.elasticsearch.test.ESTestCase.randomBoolean;
|
||||
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
|
||||
|
@ -46,14 +45,9 @@ public final class RandomCreateIndexGenerator {
|
|||
String index = randomAlphaOfLength(5);
|
||||
CreateIndexRequest request = new CreateIndexRequest(index);
|
||||
randomAliases(request);
|
||||
if (frequently()) {
|
||||
if (randomBoolean()) {
|
||||
String type = randomAlphaOfLength(5);
|
||||
if (randomBoolean()) {
|
||||
request.mapping(type, randomMapping());
|
||||
} else {
|
||||
request.mapping(type, randomMapping(type));
|
||||
|
||||
}
|
||||
request.mapping(type, randomMapping(type));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
request.settings(randomIndexSettings());
|
||||
|
@ -82,16 +76,6 @@ public final class RandomCreateIndexGenerator {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
public static XContentBuilder randomMapping() throws IOException {
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
|
||||
randomMappingFields(builder, true);
|
||||
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static XContentBuilder randomMapping(String type) throws IOException {
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject().startObject(type);
|
||||
|
|
|
@ -602,8 +602,8 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
final LongSupplier globalCheckpointSupplier;
|
||||
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
|
||||
if (maybeGlobalCheckpointSupplier == null) {
|
||||
final ReplicationTracker replicationTracker =
|
||||
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {});
|
||||
final ReplicationTracker replicationTracker = new ReplicationTracker(
|
||||
shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L);
|
||||
globalCheckpointSupplier = replicationTracker;
|
||||
retentionLeasesSupplier = replicationTracker::getRetentionLeases;
|
||||
} else {
|
||||
|
|
|
@ -133,20 +133,28 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings());
|
||||
threadPool = setUpThreadPool();
|
||||
primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards
|
||||
failOnShardFailures();
|
||||
}
|
||||
|
||||
protected ThreadPool setUpThreadPool() {
|
||||
return new TestThreadPool(getClass().getName(), threadPoolSettings());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
|
||||
tearDownThreadPool();
|
||||
} finally {
|
||||
super.tearDown();
|
||||
}
|
||||
}
|
||||
|
||||
protected void tearDownThreadPool() {
|
||||
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* by default, tests will fail if any shard created by this class fails. Tests that cause failures by design
|
||||
* can call this method to ignore those failures
|
||||
|
|
|
@ -385,6 +385,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
|
|||
nonReplicatedSettings.add(IndexSettings.ALLOW_UNMAPPED);
|
||||
nonReplicatedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER);
|
||||
nonReplicatedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
|
||||
nonReplicatedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING);
|
||||
nonReplicatedSettings.add(IndexSettings.MAX_SCRIPT_FIELDS_SETTING);
|
||||
nonReplicatedSettings.add(IndexSettings.MAX_REGEX_LENGTH_SETTING);
|
||||
nonReplicatedSettings.add(IndexSettings.MAX_TERMS_COUNT_SETTING);
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
|||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.core.security.user.SystemUser;
|
||||
import org.elasticsearch.xpack.core.security.user.XPackUser;
|
||||
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
|
||||
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
|
@ -225,12 +225,12 @@ class DatafeedJob {
|
|||
Date currentTime = new Date(currentTimeSupplier.get());
|
||||
return new Annotation(msg,
|
||||
currentTime,
|
||||
SystemUser.NAME,
|
||||
XPackUser.NAME,
|
||||
startTime,
|
||||
endTime,
|
||||
jobId,
|
||||
currentTime,
|
||||
SystemUser.NAME,
|
||||
XPackUser.NAME,
|
||||
"annotation");
|
||||
}
|
||||
|
||||
|
@ -238,9 +238,11 @@ class DatafeedJob {
|
|||
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
|
||||
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
request.source(xContentBuilder);
|
||||
IndexResponse response = client.index(request).actionGet();
|
||||
lastDataCheckAnnotation = annotation;
|
||||
return response.getId();
|
||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
|
||||
IndexResponse response = client.index(request).actionGet();
|
||||
lastDataCheckAnnotation = annotation;
|
||||
return response.getId();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
String errorMessage = "[" + jobId + "] failed to create annotation for delayed data checker.";
|
||||
LOGGER.error(errorMessage, ex);
|
||||
|
@ -251,7 +253,7 @@ class DatafeedJob {
|
|||
|
||||
private void updateAnnotation(Annotation annotation) {
|
||||
Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotation);
|
||||
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
|
||||
updatedAnnotation.setModifiedUsername(XPackUser.NAME);
|
||||
updatedAnnotation.setModifiedTime(new Date(currentTimeSupplier.get()));
|
||||
updatedAnnotation.setAnnotation(annotation.getAnnotation());
|
||||
updatedAnnotation.setTimestamp(annotation.getTimestamp());
|
||||
|
@ -260,8 +262,10 @@ class DatafeedJob {
|
|||
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
indexRequest.id(lastDataCheckAnnotationId);
|
||||
indexRequest.source(xContentBuilder);
|
||||
client.index(indexRequest).actionGet();
|
||||
lastDataCheckAnnotation = updatedAnnotation;
|
||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
|
||||
client.index(indexRequest).actionGet();
|
||||
lastDataCheckAnnotation = updatedAnnotation;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
String errorMessage = "[" + jobId + "] failed to update annotation for delayed data checker.";
|
||||
LOGGER.error(errorMessage, ex);
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
|
|||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.core.security.user.SystemUser;
|
||||
import org.elasticsearch.xpack.core.security.user.XPackUser;
|
||||
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
|
||||
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
|
@ -271,12 +271,12 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
|
||||
Annotation expectedAnnotation = new Annotation(msg,
|
||||
new Date(currentTime),
|
||||
SystemUser.NAME,
|
||||
XPackUser.NAME,
|
||||
bucket.getTimestamp(),
|
||||
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
|
||||
jobId,
|
||||
new Date(currentTime),
|
||||
SystemUser.NAME,
|
||||
XPackUser.NAME,
|
||||
"annotation");
|
||||
|
||||
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
|
@ -312,7 +312,7 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
|
||||
updatedAnnotation.setAnnotation(msg);
|
||||
updatedAnnotation.setModifiedTime(new Date(currentTime));
|
||||
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
|
||||
updatedAnnotation.setModifiedUsername(XPackUser.NAME);
|
||||
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
|
||||
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
|
||||
indexRequest.source(xContentBuilder);
|
||||
|
|
|
@ -110,6 +110,8 @@ aggCountWithAlias
|
|||
SELECT gender g, COUNT(*) c FROM "test_emp" GROUP BY g ORDER BY gender;
|
||||
countDistinct
|
||||
SELECT COUNT(DISTINCT hire_date) AS count FROM test_emp;
|
||||
countDistinctAndCountSimpleWithAlias
|
||||
SELECT COUNT(*) cnt, COUNT(DISTINCT first_name) as names, gender FROM test_emp GROUP BY gender ORDER BY gender;
|
||||
|
||||
aggCountAliasAndWhereClauseMultiGroupBy
|
||||
SELECT gender g, languages l, COUNT(*) c FROM "test_emp" WHERE emp_no < 10020 GROUP BY gender, languages ORDER BY gender, languages;
|
||||
|
@ -121,6 +123,8 @@ aggCountWithAliasMultiGroupBy
|
|||
SELECT gender g, languages l, COUNT(*) c FROM "test_emp" GROUP BY g, l ORDER BY gender, languages;
|
||||
aggCountWithAliasMultiGroupByDifferentOrder
|
||||
SELECT gender g, languages l, COUNT(*) c FROM "test_emp" GROUP BY g, l ORDER BY languages ASC, gender DESC;
|
||||
aggCountDistinctWithAliasAndGroupBy
|
||||
SELECT COUNT(*) cnt, COUNT(DISTINCT first_name) as names, gender FROM test_emp GROUP BY gender ORDER BY gender;
|
||||
|
||||
|
||||
|
||||
|
@ -161,12 +165,20 @@ aggCountStarAndHavingBetween
|
|||
SELECT gender g, COUNT(*) c FROM "test_emp" GROUP BY g HAVING c BETWEEN 10 AND 70 ORDER BY gender ASC;
|
||||
aggCountStarAndHavingBetweenWithLimit
|
||||
SELECT gender g, COUNT(*) c FROM "test_emp" GROUP BY g HAVING c BETWEEN 10 AND 70 ORDER BY gender LIMIT 1;
|
||||
aggCountDistinctAndHavingBetweenWithLimit
|
||||
SELECT gender g, COUNT(DISTINCT first_name) c FROM "test_emp" GROUP BY g HAVING c BETWEEN 40 AND 50 ORDER BY gender LIMIT 1;
|
||||
aggCountOnColumnAndHavingOnAliasAndFunction
|
||||
SELECT gender g, COUNT(gender) c FROM "test_emp" GROUP BY g HAVING c > 10 AND COUNT(gender) < 70 ORDER BY gender;
|
||||
aggCountOnColumnAndHavingOnAliasAndFunctionWildcard -> COUNT(*/1) vs COUNT(gender)
|
||||
SELECT gender g, COUNT(gender) c FROM "test_emp" GROUP BY g HAVING c > 10 AND COUNT(*) < 70 ORDER BY gender;
|
||||
aggCountOnColumnAndHavingOnAliasAndFunctionConstant
|
||||
SELECT gender g, COUNT(gender) c FROM "test_emp" GROUP BY g HAVING c > 10 AND COUNT(1) < 70 ORDER BY gender;
|
||||
aggDistinctCountWithAliasAndHaving
|
||||
SELECT COUNT(*) c, COUNT(DISTINCT first_name) AS names, gender FROM test_emp GROUP BY gender HAVING names > 40 ORDER BY gender;
|
||||
aggDistinctCountWithFunctionWildcardAndHaving
|
||||
SELECT COUNT(*) c, COUNT(DISTINCT first_name) AS names, gender FROM test_emp GROUP BY gender HAVING names < 50 AND c < 50 ORDER BY gender;
|
||||
aggDistinctCountWithFunctionWildcardAndFunctionConstantAndHaving
|
||||
SELECT COUNT(*) c, COUNT(DISTINCT first_name) AS names, COUNT(123) AS c123, gender FROM test_emp GROUP BY gender HAVING names < 50 AND c < 50 AND c123 < 50 ORDER BY gender;
|
||||
|
||||
aggCountAndHavingMultiGroupBy
|
||||
SELECT gender g, languages l, COUNT(*) c FROM "test_emp" GROUP BY g, l HAVING COUNT(*) > 10 ORDER BY gender, l;
|
||||
|
@ -195,6 +207,8 @@ aggCountOnColumnAndHavingOnAliasAndFunctionWildcardMultiGroupBy -> COUNT(*/1) vs
|
|||
SELECT gender g, languages l, COUNT(gender) c FROM "test_emp" GROUP BY g, l HAVING c > 10 AND COUNT(*) < 70 ORDER BY gender, languages;
|
||||
aggCountOnColumnAndHavingOnAliasAndFunctionConstantMultiGroupBy
|
||||
SELECT gender g, languages l, COUNT(gender) c FROM "test_emp" GROUP BY g, l HAVING c > 10 AND COUNT(1) < 70 ORDER BY gender, languages;
|
||||
aggCountOnDistinctColumnAndHavingOnAliasAndFunctionConstantMultiGroupBy
|
||||
SELECT gender g, languages l, COUNT(DISTINCT last_name) c FROM "test_emp" GROUP BY g, l HAVING c > 5 AND COUNT(1) < 70 ORDER BY gender, languages;
|
||||
|
||||
|
||||
// MIN
|
||||
|
|
|
@ -61,6 +61,9 @@ public class Count extends AggregateFunction {
|
|||
|
||||
@Override
|
||||
public AggregateFunctionAttribute toAttribute() {
|
||||
return new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), "_count");
|
||||
if (!distinct()) {
|
||||
return new AggregateFunctionAttribute(source(), name(), dataType(), id(), functionId(), "_count");
|
||||
}
|
||||
return super.toAttribute();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -429,7 +429,13 @@ final class QueryTranslator {
|
|||
static String field(AggregateFunction af) {
|
||||
Expression arg = af.field();
|
||||
if (arg instanceof FieldAttribute) {
|
||||
return ((FieldAttribute) arg).name();
|
||||
FieldAttribute field = (FieldAttribute) arg;
|
||||
// COUNT(DISTINCT) uses cardinality aggregation which works on exact values (not changed by analyzers or normalizers)
|
||||
if (af instanceof Count && ((Count) af).distinct()) {
|
||||
// use the `keyword` version of the field, if there is one
|
||||
return field.isInexact() ? field.exactAttribute().name() : field.name();
|
||||
}
|
||||
return field.name();
|
||||
}
|
||||
if (arg instanceof Literal) {
|
||||
return String.valueOf(((Literal) arg).value());
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.planner;
|
||||
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.TestUtils;
|
||||
|
@ -19,11 +21,14 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
|
|||
import org.elasticsearch.xpack.sql.expression.function.grouping.Histogram;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor.MathOperation;
|
||||
import org.elasticsearch.xpack.sql.expression.gen.script.ScriptTemplate;
|
||||
import org.elasticsearch.xpack.sql.optimizer.Optimizer;
|
||||
import org.elasticsearch.xpack.sql.parser.SqlParser;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.Aggregate;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.Filter;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.sql.plan.logical.Project;
|
||||
import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
|
||||
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.sql.planner.QueryTranslator.QueryTranslation;
|
||||
import org.elasticsearch.xpack.sql.querydsl.agg.AggFilter;
|
||||
import org.elasticsearch.xpack.sql.querydsl.query.ExistsQuery;
|
||||
|
@ -41,6 +46,7 @@ import org.elasticsearch.xpack.sql.util.DateUtils;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -55,6 +61,8 @@ public class QueryTranslatorTests extends ESTestCase {
|
|||
|
||||
private static SqlParser parser;
|
||||
private static Analyzer analyzer;
|
||||
private static Optimizer optimizer;
|
||||
private static Planner planner;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
|
@ -64,6 +72,8 @@ public class QueryTranslatorTests extends ESTestCase {
|
|||
EsIndex test = new EsIndex("test", mapping);
|
||||
IndexResolution getIndexResult = IndexResolution.valid(test);
|
||||
analyzer = new Analyzer(TestUtils.TEST_CFG, new FunctionRegistry(), getIndexResult, new Verifier(new Metrics()));
|
||||
optimizer = new Optimizer();
|
||||
planner = new Planner();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -75,6 +85,10 @@ public class QueryTranslatorTests extends ESTestCase {
|
|||
private LogicalPlan plan(String sql) {
|
||||
return analyzer.analyze(parser.createStatement(sql), true);
|
||||
}
|
||||
|
||||
private PhysicalPlan optimizeAndPlan(String sql) {
|
||||
return planner.plan(optimizer.optimize(plan(sql)), true);
|
||||
}
|
||||
|
||||
public void testTermEqualityAnalyzer() {
|
||||
LogicalPlan p = plan("SELECT some.string FROM test WHERE some.string = 'value'");
|
||||
|
@ -433,6 +447,7 @@ public class QueryTranslatorTests extends ESTestCase {
|
|||
scriptTemplate.toString());
|
||||
assertEquals("[{v=int}, {v=10}]", scriptTemplate.params().toString());
|
||||
}
|
||||
|
||||
public void testGroupByDateHistogram() {
|
||||
LogicalPlan p = plan("SELECT MAX(int) FROM test GROUP BY HISTOGRAM(int, 1000)");
|
||||
assertTrue(p instanceof Aggregate);
|
||||
|
@ -448,7 +463,6 @@ public class QueryTranslatorTests extends ESTestCase {
|
|||
assertEquals(DataType.INTEGER, field.dataType());
|
||||
}
|
||||
|
||||
|
||||
public void testGroupByHistogram() {
|
||||
LogicalPlan p = plan("SELECT MAX(int) FROM test GROUP BY HISTOGRAM(date, INTERVAL 2 YEARS)");
|
||||
assertTrue(p instanceof Aggregate);
|
||||
|
@ -463,4 +477,23 @@ public class QueryTranslatorTests extends ESTestCase {
|
|||
assertEquals(FieldAttribute.class, field.getClass());
|
||||
assertEquals(DataType.DATE, field.dataType());
|
||||
}
|
||||
|
||||
public void testCountDistinctCardinalityFolder() {
|
||||
PhysicalPlan p = optimizeAndPlan("SELECT COUNT(DISTINCT keyword) cnt FROM test GROUP BY bool HAVING cnt = 0");
|
||||
assertEquals(EsQueryExec.class, p.getClass());
|
||||
EsQueryExec ee = (EsQueryExec) p;
|
||||
assertEquals(1, ee.output().size());
|
||||
assertThat(ee.output().get(0).toString(), startsWith("cnt{a->"));
|
||||
|
||||
Collection<AggregationBuilder> subAggs = ee.queryContainer().aggs().asAggBuilder().getSubAggregations();
|
||||
assertEquals(1, subAggs.size());
|
||||
assertTrue(subAggs.toArray()[0] instanceof CardinalityAggregationBuilder);
|
||||
|
||||
CardinalityAggregationBuilder cardinalityAgg = (CardinalityAggregationBuilder) subAggs.toArray()[0];
|
||||
assertEquals("keyword", cardinalityAgg.field());
|
||||
assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
|
||||
endsWith("{\"buckets_path\":{\"a0\":\"" + cardinalityAgg.getName() +"\"},\"script\":{"
|
||||
+ "\"source\":\"InternalSqlScriptUtils.nullSafeFilter(InternalSqlScriptUtils.eq(params.a0,params.v0))\","
|
||||
+ "\"lang\":\"painless\",\"params\":{\"v0\":0}},\"gap_policy\":\"skip\"}}}}}"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue