Merge branch '7.x' into enrich-7.x

This commit is contained in:
Michael Basnight 2019-05-28 18:32:18 -05:00
commit be60125a4e
150 changed files with 1518 additions and 1213 deletions

View File

@ -20,7 +20,7 @@ slf4j = 1.6.2
# when updating the JNA version, also update the version in buildSrc/build.gradle
jna = 4.5.1
netty = 4.1.35.Final
netty = 4.1.36.Final
joda = 2.10.2
# when updating this version, you need to ensure compatibility with:

View File

@ -82,6 +82,7 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
internalPolicy.logResponse(failure);
responses.add(failure);
latch.countDown();
}
@ -105,16 +106,8 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries",
failure.getCause());
} else {
rejectedAfterAllRetries = true;
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
assertRetriedCorrectly(internalPolicy, bulkResponse, failure.getCause());
rejectedAfterAllRetries = true;
}
} else {
throw new AssertionError("Unexpected failure with status: " + failure.getStatus());
@ -123,8 +116,12 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
}
} else {
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS) {
if (rejectedExecutionExpected == false) {
assertRetriedCorrectly(internalPolicy, response, ((Throwable) response).getCause());
rejectedAfterAllRetries = true;
}
// ignored, we exceeded the write queue size when dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
@ -146,6 +143,17 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
}
private void assertRetriedCorrectly(CorrelatingBackoffPolicy internalPolicy, Object bulkResponse, Throwable failure) {
Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", failure);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
}
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
@ -164,7 +172,7 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
private final Map<Object, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
@ -175,13 +183,13 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
this.delegate = delegate;
}
public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
public Iterator<TimeValue> backoffStateFor(Object response) {
return correlations.get(response);
}
// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
public void logResponse(Object response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {

View File

@ -1,9 +1,14 @@
[[search-aggregations-pipeline-movavg-aggregation]]
=== Moving Average Aggregation
ifdef::asciidoctor[]
deprecated:[6.4.0, "The Moving Average aggregation has been deprecated in favor of the more general <<search-aggregations-pipeline-movfn-aggregation,Moving Function Aggregation>>. The new Moving Function aggregation provides all the same functionality as the Moving Average aggregation, but also provides more flexibility."]
endif::[]
ifndef::asciidoctor[]
deprecated[6.4.0, The Moving Average aggregation has been deprecated in favor of the more general
<<search-aggregations-pipeline-movfn-aggregation,Moving Function Aggregation>>. The new Moving Function aggregation provides
all the same functionality as the Moving Average aggregation, but also provides more flexibility.]
endif::[]
Given an ordered series of data, the Moving Average aggregation will slide a window across the data and emit the average
value of that window. For example, given the data `[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`, we can calculate a simple moving

View File

@ -61,12 +61,12 @@ A {dfeed} resource has the following properties:
`delayed_data_check_config`::
(object) Specifies whether the data feed checks for missing data and
and the size of the window. For example:
the size of the window. For example:
`{"enabled": true, "check_window": "1h"}` See
<<ml-datafeed-delayed-data-check-config>>.
[[ml-datafeed-chunking-config]]
==== Chunking Configuration Objects
==== Chunking configuration objects
{dfeeds-cap} might be required to search over long time periods, for several months
or years. This search is split into time chunks in order to ensure the load
@ -88,31 +88,33 @@ A chunking configuration object has the following properties:
For example: `3h`.
[[ml-datafeed-delayed-data-check-config]]
==== Delayed Data Check Configuration Objects
==== Delayed data check configuration objects
The {dfeed} can optionally search over indices that have already been read in
an effort to find if any data has since been added to the index. If missing data
is found, it is a good indication that the `query_delay` option is set too low and
the data is being indexed after the {dfeed} has passed that moment in time. See
an effort to determine whether any data has subsequently been added to the index.
If missing data is found, it is a good indication that the `query_delay` option
is set too low and the data is being indexed after the {dfeed} has passed that
moment in time. See
{stack-ov}/ml-delayed-data-detection.html[Working with delayed data].
This check only runs on real-time {dfeeds}
This check runs only on real-time {dfeeds}.
The configuration object has the following properties:
`enabled`::
(boolean) Should the {dfeed} periodically check for data being indexed after reading.
Defaults to `true`
(boolean) Specifies whether the {dfeed} periodically checks for delayed data.
Defaults to `true`.
`check_window`::
(time units) The window of time before the latest finalized bucket that should be searched
for late data. Defaults to `null` which causes an appropriate `check_window` to be calculated
when the real-time {dfeed} runs.
The default `check_window` span calculation is the max between `2h` or `8 * bucket_span`.
(time units) The window of time that is searched for late data. This window of
time ends with the latest finalized bucket. It defaults to `null`, which
causes an appropriate `check_window` to be calculated when the real-time
{dfeed} runs. In particular, the default `check_window` span calculation is
based on the maximum of `2h` or `8 * bucket_span`.
[float]
[[ml-datafeed-counts]]
==== {dfeed-cap} Counts
==== {dfeed-cap} counts
The get {dfeed} statistics API provides information about the operational
progress of a {dfeed}. All of these properties are informational; you cannot

View File

@ -45,6 +45,11 @@ IMPORTANT: You must use {kib} or this API to create a {dfeed}. Do not put a {df
(object) Specifies how data searches are split into time chunks.
See <<ml-datafeed-chunking-config>>.
`delayed_data_check_config`::
(object) Specifies whether the data feed checks for missing data and
the size of the window. See
<<ml-datafeed-delayed-data-check-config>>.
`frequency`::
(time units) The interval at which scheduled queries are made while the {dfeed}
runs in real time. The default value is either the bucket span for short
@ -82,10 +87,6 @@ IMPORTANT: You must use {kib} or this API to create a {dfeed}. Do not put a {df
(unsigned integer) The `size` parameter that is used in {es} searches.
The default value is `1000`.
`delayed_data_check_config`::
(object) Specifies if and with how large a window should the data feed check
for missing data. See <<ml-datafeed-delayed-data-check-config>>.
For more information about these properties,
see <<ml-datafeed-resource>>.

View File

@ -14,7 +14,10 @@ Updates certain properties of a {dfeed}.
`POST _ml/datafeeds/<feed_id>/_update`
//===== Description
===== Description
NOTE: If you update the `delayed_data_check_config` property, you must stop and
start the {dfeed} for the change to be applied.
==== Path Parameters
@ -32,6 +35,10 @@ The following properties can be updated after the {dfeed} is created:
`chunking_config`::
(object) Specifies how data searches are split into time chunks.
See <<ml-datafeed-chunking-config>>.
`delayed_data_check_config`::
(object) Specifies whether the data feed checks for missing data and
the size of the window. See <<ml-datafeed-delayed-data-check-config>>.
`frequency`::
(time units) The interval at which scheduled queries are made while the

View File

@ -17,6 +17,8 @@ image::monitoring/images/metricbeat.png[Example monitoring architecture]
To learn about monitoring in general, see
{stack-ov}/xpack-monitoring.html[Monitoring the {stack}].
//NOTE: The tagged regions are re-used in the Stack Overview.
. Enable the collection of monitoring data. Set
`xpack.monitoring.collection.enabled` to `true` on each node in the production
cluster. By default, it is is disabled (`false`).
@ -71,13 +73,13 @@ PUT _cluster/settings
Leave `xpack.monitoring.enabled` set to its default value (`true`).
--
. On each {es} node in the production cluster:
. {metricbeat-ref}/metricbeat-installation.html[Install {metricbeat}] on each
{es} node in the production cluster.
.. {metricbeat-ref}/metricbeat-installation.html[Install {metricbeat}].
.. Enable the {es} module in {metricbeat}. +
. Enable the {es} {xpack} module in {metricbeat} on each {es} node. +
+
--
// tag::enable-es-module[]
For example, to enable the default configuration in the `modules.d` directory,
run the following command:
@ -89,39 +91,57 @@ metricbeat modules enable elasticsearch-xpack
For more information, see
{metricbeat-ref}/configuration-metricbeat.html[Specify which modules to run] and
{metricbeat-ref}/metricbeat-module-elasticsearch.html[{es} module].
// end::enable-es-module[]
--
.. By default the module will collect {es} monitoring metrics from `http://localhost:9200`.
If the local {es} node has a different address, you must specify it via the `hosts` setting
in the `modules.d/elasticsearch-xpack.yml` file.
.. If Elastic {security-features} are enabled, you must also provide a user ID
and password so that {metricbeat} can collect metrics successfully.
... Create a user on the production cluster that has the
{stack-ov}/built-in-roles.html[`remote_monitoring_collector` built-in role].
Alternatively, use the {stack-ov}/built-in-users.html[`remote_monitoring_user` built-in user].
... Add the `username` and `password` settings to the {es} module configuration
file.
. Configure the {es} {xpack} module in {metricbeat} on each {es} node. +
+
--
For example, add the following settings in the `modules.d/elasticsearch-xpack.yml` file:
// tag::configure-es-module[]
The `modules.d/elasticsearch-xpack.yml` file contains the following settings:
[source,yaml]
----------------------------------
- module: elasticsearch
...
username: remote_monitoring_user
password: YOUR_PASSWORD
- module: elasticsearch
metricsets:
- ccr
- cluster_stats
- index
- index_recovery
- index_summary
- ml_job
- node_stats
- shard
period: 10s
hosts: ["http://localhost:9200"]
#username: "user"
#password: "secret"
xpack.enabled: true
----------------------------------
By default, the module collects {es} monitoring metrics from
`http://localhost:9200`. If that host and port number are not correct, you must
update the `hosts` setting. If you configured {es} to use encrypted
communications, you must access it via HTTPS. For example, use a `hosts` setting
like `https://localhost:9200`.
// end::configure-es-module[]
// tag::remote-monitoring-user[]
If Elastic {security-features} are enabled, you must also provide a user ID
and password so that {metricbeat} can collect metrics successfully:
.. Create a user on the production cluster that has the
{stack-ov}/built-in-roles.html[`remote_monitoring_collector` built-in role].
Alternatively, use the
{stack-ov}/built-in-users.html[`remote_monitoring_user` built-in user].
.. Add the `username` and `password` settings to the {es} module configuration
file.
// end::remote-monitoring-user[]
--
.. If you configured {es} to use <<configuring-tls,encrypted communications>>,
you must access it via HTTPS. For example, use a `hosts` setting like
`https://localhost:9200` in the `modules.d/elasticsearch-xpack.yml` file.
.. Identify where to send the monitoring data. +
. Identify where to send the monitoring data. +
+
--
TIP: In production environments, we strongly recommend using a separate cluster
@ -136,48 +156,43 @@ configuration file (`metricbeat.yml`):
[source,yaml]
----------------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["http://es-mon-1:9200", "http://es-mon2:9200"] <1>
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
----------------------------------
<1> In this example, the data is stored on a monitoring cluster with nodes
`es-mon-1` and `es-mon-2`.
If you configured the monitoring cluster to use encrypted communications, you
must access it via HTTPS. For example, use a `hosts` setting like
`https://es-mon-1:9200`.
IMPORTANT: The {es} {monitor-features} use ingest pipelines, therefore the
cluster that stores the monitoring data must have at least one
<<ingest,ingest node>>.
For more information about these configuration options, see
{metricbeat-ref}/elasticsearch-output.html[Configure the {es} output].
--
If {es} {security-features} are enabled on the monitoring cluster, you must
provide a valid user ID and password so that {metricbeat} can send metrics
successfully:
.. If {es} {security-features} are enabled on the monitoring cluster, you
must provide a valid user ID and password so that {metricbeat} can send metrics
successfully.
... Create a user on the monitoring cluster that has the
.. Create a user on the monitoring cluster that has the
{stack-ov}/built-in-roles.html[`remote_monitoring_agent` built-in role].
Alternatively, use the
{stack-ov}/built-in-users.html[`remote_monitoring_user` built-in user].
... Add the `username` and `password` settings to the {es} output information in
the {metricbeat} configuration file (`metricbeat.yml`):
+
--
[source,yaml]
----------------------------------
output.elasticsearch:
...
username: remote_monitoring_user
password: YOUR_PASSWORD
----------------------------------
.. Add the `username` and `password` settings to the {es} output information in
the {metricbeat} configuration file.
For more information about these configuration options, see
{metricbeat-ref}/elasticsearch-output.html[Configure the {es} output].
--
.. If you configured the monitoring cluster to use
<<configuring-tls,encrypted communications>>, you must access it via
HTTPS. For example, use a `hosts` setting like `https://es-mon-1:9200` in the
`metricbeat.yml` file.
. <<starting-elasticsearch,Start {es}>> on each node.
. <<starting-elasticsearch,Start {es}>>.
. {metricbeat-ref}/metricbeat-starting.html[Start {metricbeat}].
. {metricbeat-ref}/metricbeat-starting.html[Start {metricbeat}] on each node.
. {kibana-ref}/monitoring-data.html[View the monitoring data in {kib}].

View File

@ -1,8 +1,6 @@
[[release-notes-7.0.0-rc1]]
== {es} version 7.0.0-rc1
coming[7.0.0-rc1]
Also see <<breaking-changes-7.0,Breaking changes in 7.0.0-rc1>>.
[[breaking-7.0.0-rc1]]

View File

@ -1,8 +1,6 @@
[[release-notes-7.1.1]]
== {es} version 7.1.1
coming[7.1.1]
Also see <<breaking-changes-7.1,Breaking changes in 7.1>>.
[[bug-7.1.1]]

View File

@ -4,8 +4,6 @@
<titleabbrev>7.0.0</titleabbrev>
++++
coming[7.0.0]
//NOTE: The notable-highlights tagged regions are re-used in the
//Installation and Upgrade Guide

View File

@ -5,7 +5,7 @@
experimental[]
Most {rollup} endpoints have the following base:
Most rollup endpoints have the following base:
[source,js]
----

View File

@ -9,7 +9,7 @@
* <<rollup-put-job,Create Job>>, <<rollup-delete-job,Delete Job>>,
* <<rollup-start-job,Start Job>>, <<rollup-stop-job,Stop Job>>,
* <<rollup-get-job,Get Job, List all Jobs>>
* <<rollup-get-job,Get Job+++,+++ List all Jobs>>
* <<rollup-job-config,Job configuration details>>
[float]

View File

@ -33,7 +33,7 @@ GET /_search
// CONSOLE
// TEST[setup:sales]
Script fields can work on fields that are not stored (`my_field_name` in
Script fields can work on fields that are not stored (`price` in
the above case), and allow to return custom values to be returned (the
evaluated value of the script).

View File

@ -332,7 +332,7 @@ data through a bind-mount:
As a last resort, you can also force the container to mutate the ownership of
any bind-mounts used for the <<path-settings,data and log dirs>> through the
environment variable `TAKE_FILE_OWNERSHIP`. Inn this case, they will be owned by
environment variable `TAKE_FILE_OWNERSHIP`. In this case, they will be owned by
uid:gid `1000:0` providing read/write access to the {es} process as required.
--

View File

@ -181,7 +181,7 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
this.documentSupplier = null;
}
private PercolateQueryBuilder(String field, String documentType, Supplier<BytesReference> documentSupplier) {
protected PercolateQueryBuilder(String field, String documentType, Supplier<BytesReference> documentSupplier) {
if (field == null) {
throw new IllegalArgumentException("[field] is a required argument");
}
@ -519,8 +519,12 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
if (source == null) {
return this; // not executed yet
} else {
return new PercolateQueryBuilder(field, documentType, Collections.singletonList(source),
XContentHelper.xContentType(source));
PercolateQueryBuilder rewritten = new PercolateQueryBuilder(field, documentType,
Collections.singletonList(source), XContentHelper.xContentType(source));
if (name != null) {
rewritten.setName(name);
}
return rewritten;
}
}
GetRequest getRequest;
@ -555,7 +559,12 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
listener.onResponse(null);
}, listener::onFailure));
});
return new PercolateQueryBuilder(field, documentType, documentSupplier::get);
PercolateQueryBuilder rewritten = new PercolateQueryBuilder(field, documentType, documentSupplier::get);
if (name != null) {
rewritten.setName(name);
}
return rewritten;
}
@Override
@ -654,6 +663,10 @@ public class PercolateQueryBuilder extends AbstractQueryBuilder<PercolateQueryBu
return documentXContentType;
}
public String getQueryName() {
return name;
}
static IndexSearcher createMultiDocumentSearcher(Analyzer analyzer, Collection<ParsedDocument> docs) {
RAMDirectory ramDirectory = new RAMDirectory();
try (IndexWriter indexWriter = new IndexWriter(ramDirectory, new IndexWriterConfig(analyzer))) {

View File

@ -54,6 +54,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
@ -331,4 +332,29 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase<PercolateQ
assertEquals(query.getVerifiedMatchesQuery(), aliasQuery.getVerifiedMatchesQuery());
}
public void testSettingNameWhileRewriting() {
String testName = "name1";
QueryShardContext shardContext = createShardContext();
PercolateQueryBuilder percolateQueryBuilder = doCreateTestQueryBuilder(true);
percolateQueryBuilder.setName(testName);
QueryBuilder rewrittenQueryBuilder = percolateQueryBuilder.doRewrite(shardContext);
assertEquals(testName, ((PercolateQueryBuilder) rewrittenQueryBuilder).getQueryName());
assertNotEquals(rewrittenQueryBuilder, percolateQueryBuilder);
}
public void testSettingNameWhileRewritingWhenDocumentSupplierAndSourceNotNull() {
Supplier<BytesReference> supplier = () -> new BytesArray("{\"test\": \"test\"}");
String testName = "name1";
QueryShardContext shardContext = createShardContext();
PercolateQueryBuilder percolateQueryBuilder = new PercolateQueryBuilder(queryField, null, supplier);
percolateQueryBuilder.setName(testName);
QueryBuilder rewrittenQueryBuilder = percolateQueryBuilder.doRewrite(shardContext);
assertEquals(testName, ((PercolateQueryBuilder) rewrittenQueryBuilder).getQueryName());
assertNotEquals(rewrittenQueryBuilder, percolateQueryBuilder);
}
}

View File

@ -125,7 +125,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) {
dependsOn unzip
executable = new File(project.runtimeJavaHome, 'bin/java')
env 'CLASSPATH', "${ -> project.configurations.oldesFixture.asPath }"
env 'JAVA_HOME', getJavaHome(it, 8)
env 'JAVA_HOME', "${ -> getJavaHome(it, 8)}"
args 'oldes.OldElasticsearch',
baseDir,
unzip.temporaryDir,

View File

@ -57,9 +57,6 @@ public class URLBlobStore implements BlobStore {
new ByteSizeValue(100, ByteSizeUnit.KB)).getBytes();
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return path.toString();
@ -83,9 +80,6 @@ public class URLBlobStore implements BlobStore {
return this.bufferSizeInBytes;
}
/**
* {@inheritDoc}
*/
@Override
public BlobContainer blobContainer(BlobPath path) {
try {
@ -95,17 +89,6 @@ public class URLBlobStore implements BlobStore {
}
}
/**
* This operation is not supported by URL Blob Store
*/
@Override
public void delete(BlobPath path) {
throw new UnsupportedOperationException("URL repository is read only");
}
/**
* {@inheritDoc}
*/
@Override
public void close() {
// nothing to do here...

View File

@ -1 +0,0 @@
a244722975cddaef5f9bbd45e7a44d0db5f058d8

View File

@ -0,0 +1 @@
7f2db0921dd57df4db076229830ab09bba713aeb

View File

@ -1 +0,0 @@
b86f6b9eedbe38d6fa0bbbefa961d566e293e13e

View File

@ -0,0 +1 @@
8462116d327bb3d1ec24258071f2e7345a73dbfc

View File

@ -1 +0,0 @@
f7a38b0a3ee2fff3d9dd2bb44f5e16140b70b354

View File

@ -0,0 +1 @@
62b73d439dbddf3c0dde092b048580139695ab46

View File

@ -1 +0,0 @@
c776487b782046e1399b00cd40c63ef51d26e953

View File

@ -0,0 +1 @@
f6f38fde652a70ea579897edc80e52353e487ae6

View File

@ -1 +0,0 @@
b23efe31416942718ac46ad00bb3e91e4b3f6ab7

View File

@ -0,0 +1 @@
1c38a5920a10c01b1cce4cdc964447ec76abf1b5

View File

@ -1 +0,0 @@
d60c4f4e12f0703dff477c9bf595f37a41ecacbc

View File

@ -0,0 +1 @@
e4d243fbf4e6837fa294f892bf97149e18129100

View File

@ -1 +0,0 @@
526b2646885c57adb54e2f89b2e2b80bebce3962

View File

@ -0,0 +1 @@
8546e6be47be587acab86bbd106ca023678f07d9

View File

@ -71,3 +71,24 @@ testClusters {
keystore 'azure.client.integration_test.key', 'azure_key'
}
}
String azureAccount = System.getenv("azure_storage_account")
String azureKey = System.getenv("azure_storage_key")
String azureContainer = System.getenv("azure_storage_container")
String azureBasePath = System.getenv("azure_storage_base_path")
test {
exclude '**/AzureStorageCleanupThirdPartyTests.class'
}
task thirdPartyTest(type: Test) {
include '**/AzureStorageCleanupThirdPartyTests.class'
systemProperty 'test.azure.account', azureAccount ? azureAccount : ""
systemProperty 'test.azure.key', azureKey ? azureKey : ""
systemProperty 'test.azure.container', azureContainer ? azureContainer : ""
systemProperty 'test.azure.base', azureBasePath ? azureBasePath : ""
}
if (azureAccount || azureKey || azureContainer || azureBasePath) {
check.dependsOn(thirdPartyTest)
}

View File

@ -22,8 +22,6 @@ package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
@ -40,8 +38,6 @@ import java.util.Map;
import static java.util.Collections.emptyMap;
public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
private final AzureStorageService service;
@ -49,8 +45,7 @@ public class AzureBlobStore implements BlobStore {
private final String container;
private final LocationMode locationMode;
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service)
throws URISyntaxException, StorageException {
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
@ -73,26 +68,11 @@ public class AzureBlobStore implements BlobStore {
return locationMode;
}
public String getClientName() {
return clientName;
}
@Override
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this);
}
@Override
public void delete(BlobPath path) throws IOException {
final String keyPath = path.buildAsString();
try {
service.deleteFiles(clientName, container, keyPath);
} catch (URISyntaxException | StorageException e) {
logger.warn("cannot access [{}] in container {{}}: {}", keyPath, container, e.getMessage());
throw new IOException(e);
}
}
@Override
public void close() {
}

View File

@ -115,20 +115,16 @@ public class AzureRepository extends BlobStoreRepository {
}
}
// only use for testing
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();
}
/**
* {@inheritDoc}
*/
@Override
protected AzureBlobStore createBlobStore() throws URISyntaxException, StorageException {
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
blobStore, chunkSize, isCompress(), basePath));
return blobStore;
@ -139,9 +135,6 @@ public class AzureRepository extends BlobStoreRepository {
return basePath;
}
/**
* {@inheritDoc}
*/
@Override
protected ByteSizeValue chunkSize() {
return chunkSize;

View File

@ -98,7 +98,7 @@ public class AzureStorageService {
}
}
protected CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
private static CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
final CloudBlobClient client = createClient(azureStorageSettings);
// Set timeout option if the user sets cloud.azure.storage.timeout or
// cloud.azure.storage.xxx.timeout (it's negative by default)
@ -116,12 +116,12 @@ public class AzureStorageService {
return client;
}
protected CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
private static CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
final String connectionString = azureStorageSettings.buildConnectionString();
return CloudStorageAccount.parse(connectionString).createCloudBlobClient();
}
protected OperationContext buildOperationContext(AzureStorageSettings azureStorageSettings) {
private static OperationContext buildOperationContext(AzureStorageSettings azureStorageSettings) {
final OperationContext context = new OperationContext();
context.setProxy(azureStorageSettings.getProxy());
return context;
@ -147,24 +147,6 @@ public class AzureStorageService {
return SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, client.v2().get()));
}
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
// container name must be lower case.
logger.trace(() -> new ParameterizedMessage("delete files container [{}], path [{}]", container, path));
SocketAccess.doPrivilegedVoidException(() -> {
// list the blobs using a flat blob listing mode
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true, EnumSet.noneOf(BlobListingDetails.class), null,
client.v2().get())) {
final String blobName = blobNameFromUri(blobItem.getUri());
logger.trace(() -> new ParameterizedMessage("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri()));
// don't call {@code #deleteBlob}, use the same client
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blobName);
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
}
});
}
/**
* Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
* It should remove the container part (first part of the path) and gives path/to/myfile

View File

@ -129,14 +129,6 @@ final class AzureStorageSettings {
this.locationMode = LocationMode.PRIMARY_ONLY;
}
public String getKey() {
return key;
}
public String getAccount() {
return account;
}
public String getEndpointSuffix() {
return endpointSuffix;
}
@ -207,7 +199,7 @@ final class AzureStorageSettings {
// pkg private for tests
/** Parse settings for a single client. */
static AzureStorageSettings getClientSettings(Settings settings, String clientName) {
private static AzureStorageSettings getClientSettings(Settings settings, String clientName) {
try (SecureString account = getConfigValue(settings, clientName, ACCOUNT_SETTING);
SecureString key = getConfigValue(settings, clientName, KEY_SETTING)) {
return new AzureStorageSettings(account.toString(), key.toString(),
@ -226,7 +218,7 @@ final class AzureStorageSettings {
return concreteSetting.get(settings);
}
public static <T> T getValue(Settings settings, String groupName, Setting<T> setting) {
private static <T> T getValue(Settings settings, String groupName, Setting<T> setting) {
final Setting.AffixKey k = (Setting.AffixKey) setting.getRawKey();
final String fullKey = k.toConcreteKey(groupName).toString();
return setting.getConcreteSetting(fullKey).get(settings);

View File

@ -48,7 +48,7 @@ public final class SocketAccess {
}
}
public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation) throws StorageException, URISyntaxException {
public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation) throws StorageException {
SpecialPermission.check();
try {
return AccessController.doPrivileged(operation);

View File

@ -19,24 +19,17 @@
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import java.io.IOException;
import java.net.URISyntaxException;
public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
try {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
}
}

View File

@ -18,25 +18,17 @@
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreTestCase;
import java.io.IOException;
import java.net.URISyntaxException;
public class AzureBlobStoreTests extends ESBlobStoreTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
try {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import java.util.Collection;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(AzureRepositoryPlugin.class);
}
@Override
protected SecureSettings credentials() {
assertThat(System.getProperty("test.azure.account"), not(blankOrNullString()));
assertThat(System.getProperty("test.azure.key"), not(blankOrNullString()));
assertThat(System.getProperty("test.azure.container"), not(blankOrNullString()));
assertThat(System.getProperty("test.azure.base"), not(blankOrNullString()));
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.default.account", System.getProperty("test.azure.account"));
secureSettings.setString("azure.client.default.key", System.getProperty("test.azure.key"));
return secureSettings;
}
@Override
protected void createRepository(String repoName) {
AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName)
.setType("azure")
.setSettings(Settings.builder()
.put("container", System.getProperty("test.azure.container"))
.put("base_path", System.getProperty("test.azure.base"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}
}

View File

@ -34,7 +34,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketPermission;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
@ -61,21 +60,13 @@ public class AzureStorageServiceMock extends AzureStorageService {
return true;
}
@Override
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
final Map<String, BlobMetaData> blobs = listBlobsByPrefix(account, container, path, null);
for (String key : blobs.keySet()) {
deleteBlob(account, container, key);
}
}
@Override
public boolean blobExists(String account, String container, String blob) {
return blobs.containsKey(blob);
}
@Override
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
public void deleteBlob(String account, String container, String blob) throws StorageException {
if (blobs.remove(blob) == null) {
throw new StorageException("BlobNotFound", "[" + blob + "] does not exist.", 404, null, null);
}
@ -109,8 +100,7 @@ public class AzureStorageServiceMock extends AzureStorageService {
@Override
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
boolean failIfAlreadyExists) throws StorageException, FileAlreadyExistsException {
if (failIfAlreadyExists && blobs.containsKey(blobName)) {
throw new FileAlreadyExistsException(blobName);
}

View File

@ -1,3 +1,5 @@
import java.nio.file.Files
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
@ -122,3 +124,22 @@ check {
// also execute the QA tests when testing the plugin
dependsOn 'qa:google-cloud-storage:check'
}
String gcsServiceAccount = System.getenv("google_storage_service_account")
String gcsBucket = System.getenv("google_storage_bucket")
String gcsBasePath = System.getenv("google_storage_base_path")
test {
exclude '**/GoogleCloudStorageThirdPartyTests.class'
}
task thirdPartyTest(type: Test) {
include '**/GoogleCloudStorageThirdPartyTests.class'
systemProperty 'test.google.account', gcsServiceAccount ? Base64.encoder.encodeToString(Files.readAllBytes(file(gcsServiceAccount).toPath())) : ""
systemProperty 'test.google.bucket', gcsBucket ? gcsBucket : ""
systemProperty 'test.google.base', gcsBasePath ? gcsBasePath : "/"
}
if (gcsServiceAccount || gcsBucket || gcsBasePath) {
check.dependsOn(thirdPartyTest)
}

View File

@ -50,11 +50,11 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -91,11 +91,6 @@ class GoogleCloudStorageBlobStore implements BlobStore {
return new GoogleCloudStorageBlobContainer(path, this);
}
@Override
public void delete(BlobPath path) throws IOException {
deleteBlobsByPrefix(path.buildAsString());
}
@Override
public void close() {
}
@ -291,15 +286,6 @@ class GoogleCloudStorageBlobStore implements BlobStore {
}
}
/**
* Deletes multiple blobs from the specific bucket all of which have prefixed names
*
* @param prefix prefix of the blobs to delete
*/
private void deleteBlobsByPrefix(String prefix) throws IOException {
deleteBlobsIgnoringIfNotExists(listBlobsByPrefix("", prefix).keySet());
}
/**
* Deletes multiple blobs from the specific bucket using a batch request
*
@ -343,6 +329,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
if (e != null) {
throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
}
assert failedBlobs.isEmpty();
}
private static String buildKey(String keyPath, String s) {

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import java.util.Base64;
import java.util.Collection;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class GoogleCloudStorageThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(GoogleCloudStoragePlugin.class);
}
@Override
protected SecureSettings credentials() {
assertThat(System.getProperty("test.google.account"), not(blankOrNullString()));
assertThat(System.getProperty("test.google.bucket"), not(blankOrNullString()));
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setFile("gcs.client.default.credentials_file",
Base64.getDecoder().decode(System.getProperty("test.google.account")));
return secureSettings;
}
@Override
protected void createRepository(final String repoName) {
AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
.setType("gcs")
.setSettings(Settings.builder()
.put("bucket", System.getProperty("test.google.bucket"))
.put("base_path", System.getProperty("test.google.base", "/"))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}
}

View File

@ -66,14 +66,6 @@ final class HdfsBlobStore implements BlobStore {
});
}
@Override
public void delete(BlobPath path) throws IOException {
execute((Operation<Void>) fc -> {
fc.delete(translateToHdfsPath(path), true);
return null;
});
}
@Override
public String toString() {
return root.toUri().toString();

View File

@ -75,6 +75,7 @@ test {
// these are tested explicitly in separate test tasks
exclude '**/*CredentialsTests.class'
exclude '**/S3BlobStoreRepositoryTests.class'
exclude '**/S3RepositoryThirdPartyTests.class'
}
boolean useFixture = false
@ -134,6 +135,14 @@ if (!s3EC2Bucket && !s3EC2BasePath && !s3ECSBucket && !s3ECSBasePath) {
throw new IllegalArgumentException("not all options specified to run EC2/ECS tests are present")
}
task thirdPartyTest(type: Test) {
include '**/S3RepositoryThirdPartyTests.class'
systemProperty 'test.s3.account', s3PermanentAccessKey
systemProperty 'test.s3.key', s3PermanentSecretKey
systemProperty 'test.s3.bucket', s3PermanentBucket
systemProperty 'test.s3.base', s3PermanentBasePath
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
task writeDockerFile {
@ -151,6 +160,32 @@ if (useFixture) {
dependsOn(writeDockerFile)
}
def minioAddress = {
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
assert minioPort > 0
return 'http://127.0.0.1:' + minioPort
}
File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address')
// We can't lazy evaluate a system property for the Minio address passed to JUnit so we write it to a resource file
// and pass its name instead.
task writeMinioAddress {
dependsOn tasks.bundlePlugin, tasks.postProcessFixture
outputs.file(minioAddressFile)
doLast {
file(minioAddressFile).text = "${ -> minioAddress.call() }"
}
}
thirdPartyTest {
dependsOn writeMinioAddress
inputs.file(minioAddressFile)
systemProperty 'test.s3.endpoint', minioAddressFile.name
}
BuildPlugin.requireDocker(tasks.thirdPartyTest)
task integTestMinio(type: RestIntegTestTask) {
description = "Runs REST tests using the Minio repository."
dependsOn tasks.bundlePlugin, tasks.postProcessFixture
@ -169,11 +204,7 @@ if (useFixture) {
testClusters.integTestMinio {
keystore 's3.client.integration_test_permanent.access_key', s3PermanentAccessKey
keystore 's3.client.integration_test_permanent.secret_key', s3PermanentSecretKey
setting 's3.client.integration_test_permanent.endpoint', {
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
assert minioPort > 0
return 'http://127.0.0.1:' + minioPort
}
setting 's3.client.integration_test_permanent.endpoint', minioAddress
plugin file(tasks.bundlePlugin.archiveFile)
}
@ -191,6 +222,8 @@ if (useFixture) {
}
}
check.dependsOn(thirdPartyTest)
File parentFixtures = new File(project.buildDir, "fixtures")
File s3FixtureFile = new File(parentFixtures, 's3Fixture.properties')

View File

@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
@ -34,6 +35,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
@ -50,6 +52,8 @@ import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
@ -127,12 +131,13 @@ class S3BlobContainer extends AbstractBlobContainer {
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
for (String key : outstanding) {
partition.add(key);
if (partition.size() == MAX_BULK_DELETES ) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
@ -144,23 +149,32 @@ class S3BlobContainer extends AbstractBlobContainer {
SocketAccess.doPrivilegedVoid(() -> {
AmazonClientException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
List<String> keysInRequest =
deleteRequest.getKeys().stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
try {
clientReference.client().deleteObjects(deleteRequest);
outstanding.removeAll(keysInRequest);
} catch (MultiObjectDeleteException e) {
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
// first remove all keys that were sent in the request and then add back those that ran into an exception.
outstanding.removeAll(keysInRequest);
outstanding.addAll(
e.getErrors().stream().map(MultiObjectDeleteException.DeleteError::getKey).collect(Collectors.toSet()));
aex = ExceptionsHelper.useOrSuppress(aex, e);
} catch (AmazonClientException e) {
if (aex == null) {
aex = e;
} else {
aex.addSuppressed(e);
}
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex = ExceptionsHelper.useOrSuppress(aex, e);
}
}
if (aex != null) {
throw aex;
}
});
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
} catch (Exception e) {
throw new IOException("Failed to delete blobs [" + outstanding + "]", e);
}
assert outstanding.isEmpty();
}
private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {

View File

@ -20,10 +20,6 @@
package org.elasticsearch.repositories.s3;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -33,7 +29,6 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Locale;
class S3BlobStore implements BlobStore {
@ -90,50 +85,6 @@ class S3BlobStore implements BlobStore {
return new S3BlobContainer(path, this);
}
@Override
public void delete(BlobPath path) {
try (AmazonS3Reference clientReference = clientReference()) {
ObjectListing prevListing = null;
// From
// http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
// we can do at most 1K objects per delete
// We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
final ArrayList<KeyVersion> keys = new ArrayList<>();
while (true) {
ObjectListing list;
if (prevListing != null) {
final ObjectListing finalPrevListing = prevListing;
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(bucket, path.buildAsString()));
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
}
for (final S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new KeyVersion(summary.getKey()));
// Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
final DeleteObjectsRequest finalMultiObjectDeleteRequest = multiObjectDeleteRequest;
SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(finalMultiObjectDeleteRequest));
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
final DeleteObjectsRequest finalMultiObjectDeleteRequest = multiObjectDeleteRequest;
SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(finalMultiObjectDeleteRequest));
}
}
}
@Override
public void close() throws IOException {
this.service.close();

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.test.StreamsUtils;
import java.io.IOException;
import java.util.Collection;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(S3RepositoryPlugin.class);
}
@Override
protected SecureSettings credentials() {
assertThat(System.getProperty("test.s3.account"), not(blankOrNullString()));
assertThat(System.getProperty("test.s3.key"), not(blankOrNullString()));
assertThat(System.getProperty("test.s3.bucket"), not(blankOrNullString()));
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("s3.client.default.access_key", System.getProperty("test.s3.account"));
secureSettings.setString("s3.client.default.secret_key", System.getProperty("test.s3.key"));
return secureSettings;
}
@Override
protected void createRepository(String repoName) {
Settings.Builder settings = Settings.builder()
.put("bucket", System.getProperty("test.s3.bucket"))
.put("base_path", System.getProperty("test.s3.base", "/"));
final String endpointPath = System.getProperty("test.s3.endpoint");
if (endpointPath != null) {
try {
settings = settings.put("endpoint", StreamsUtils.copyToStringFromClasspath("/" + endpointPath));
} catch (IOException e) {
throw new AssertionError(e);
}
}
AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
.setType("s3")
.setSettings(settings).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}
}

View File

@ -1 +0,0 @@
a244722975cddaef5f9bbd45e7a44d0db5f058d8

View File

@ -0,0 +1 @@
7f2db0921dd57df4db076229830ab09bba713aeb

View File

@ -1 +0,0 @@
b86f6b9eedbe38d6fa0bbbefa961d566e293e13e

View File

@ -0,0 +1 @@
8462116d327bb3d1ec24258071f2e7345a73dbfc

View File

@ -1 +0,0 @@
f7a38b0a3ee2fff3d9dd2bb44f5e16140b70b354

View File

@ -0,0 +1 @@
62b73d439dbddf3c0dde092b048580139695ab46

View File

@ -1 +0,0 @@
c776487b782046e1399b00cd40c63ef51d26e953

View File

@ -0,0 +1 @@
f6f38fde652a70ea579897edc80e52353e487ae6

View File

@ -1 +0,0 @@
b23efe31416942718ac46ad00bb3e91e4b3f6ab7

View File

@ -0,0 +1 @@
1c38a5920a10c01b1cce4cdc964447ec76abf1b5

View File

@ -1 +0,0 @@
d60c4f4e12f0703dff477c9bf595f37a41ecacbc

View File

@ -0,0 +1 @@
e4d243fbf4e6837fa294f892bf97149e18129100

View File

@ -1 +0,0 @@
526b2646885c57adb54e2f89b2e2b80bebce3962

View File

@ -0,0 +1 @@
8546e6be47be587acab86bbd106ca023678f07d9

View File

@ -1042,7 +1042,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
if (randomBoolean()) {
numDocs = between(1, 100);
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
final Request request = new Request("POST", "/" + index + "/" + type + "/" + i);
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
assertOK(client().performRequest(request));
if (rarely()) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.hamcrest.Matcher;
@ -572,4 +573,23 @@ public class RecoveryIT extends AbstractRollingTestCase {
});
}, 60, TimeUnit.SECONDS);
}
/** Ensure that we can always execute update requests regardless of the version of cluster */
public void testUpdateDoc() throws Exception {
final String index = "test_update_doc";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
createIndex(index, settings.build());
}
ensureGreen(index);
indexDocs(index, 0, 10);
for (int i = 0; i < 10; i++) {
Request update = new Request("POST", index + "/test/" + i + "/_update/");
update.setOptions(expectWarnings(RestUpdateAction.TYPES_DEPRECATION_MESSAGE));
update.setJsonEntity("{\"doc\": {\"f\": " + randomNonNegativeLong() + "}}");
client().performRequest(update);
}
}
}

View File

@ -175,12 +175,42 @@ public final class ExceptionsHelper {
return first;
}
private static final List<Class<? extends IOException>> CORRUPTION_EXCEPTIONS =
Arrays.asList(CorruptIndexException.class, IndexFormatTooOldException.class, IndexFormatTooNewException.class);
/**
* Looks at the given Throwable's and its cause(s) as well as any suppressed exceptions on the Throwable as well as its causes
* and returns the first corruption indicating exception (as defined by {@link #CORRUPTION_EXCEPTIONS}) it finds.
* @param t Throwable
* @return Corruption indicating exception if one is found, otherwise {@code null}
*/
public static IOException unwrapCorruption(Throwable t) {
return (IOException) unwrap(t, CorruptIndexException.class,
IndexFormatTooOldException.class,
IndexFormatTooNewException.class);
if (t != null) {
do {
for (Class<?> clazz : CORRUPTION_EXCEPTIONS) {
if (clazz.isInstance(t)) {
return (IOException) t;
}
}
for (Throwable suppressed : t.getSuppressed()) {
IOException corruptionException = unwrapCorruption(suppressed);
if (corruptionException != null) {
return corruptionException;
}
}
} while ((t = t.getCause()) != null);
}
return null;
}
/**
* Looks at the given Throwable and its cause(s) and returns the first Throwable that is of one of the given classes or {@code null}
* if no matching Throwable is found. Unlike {@link #unwrapCorruption} this method does only check the given Throwable and its causes
* but does not look at any suppressed exceptions.
* @param t Throwable
* @param clazzes Classes to look for
* @return Matching Throwable if one is found, otherwise {@code null}
*/
public static Throwable unwrap(Throwable t, Class<?>... clazzes) {
if (t != null) {
do {

View File

@ -174,7 +174,6 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
break;
case INIT:
case WAITING:
case STARTED:
stage = SnapshotIndexShardStage.STARTED;
break;
case SUCCESS:

View File

@ -981,28 +981,15 @@ public abstract class TransportReplicationAction<
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
localCheckpoint = in.readZLong();
globalCheckpoint = in.readZLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(localCheckpoint);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
out.writeZLong(globalCheckpoint);
}
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
}
@Override

View File

@ -325,7 +325,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
public String toString() {
StringBuilder sb = new StringBuilder();
final String TAB = " ";
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
sb.append("cluster uuid: ").append(metaData.clusterUUID())
.append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
sb.append("version: ").append(version).append("\n");
sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");

View File

@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Meta data about snapshots that are currently executing
@ -53,12 +54,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SnapshotsInProgress that = (SnapshotsInProgress) o;
if (!entries.equals(that.entries)) return false;
return true;
return entries.equals(((SnapshotsInProgress) o).entries);
}
@Override
@ -208,18 +204,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return snapshot.toString();
}
// package private for testing
ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.WAITING) {
final String indexName = entry.key.getIndexName();
List<ShardId> waitingShards = waitingIndicesMap.get(indexName);
if (waitingShards == null) {
waitingShards = new ArrayList<>();
waitingIndicesMap.put(indexName, waitingShards);
}
waitingShards.add(entry.key);
if (entry.value.state() == ShardState.WAITING) {
waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key);
}
}
if (waitingIndicesMap.isEmpty()) {
@ -241,28 +230,27 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
*/
public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
for (ObjectCursor<ShardSnapshotStatus> status : shards) {
if (status.value.state().completed() == false) {
if (status.value.state().completed == false) {
return false;
}
}
return true;
}
public static class ShardSnapshotStatus {
private final State state;
private final ShardState state;
private final String nodeId;
private final String reason;
public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
this(nodeId, ShardState.INIT);
}
public ShardSnapshotStatus(String nodeId, State state) {
public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
}
public ShardSnapshotStatus(String nodeId, State state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
@ -272,11 +260,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
state = ShardState.fromValue(in.readByte());
reason = in.readOptionalString();
}
public State state() {
public ShardState state() {
return state;
}
@ -298,14 +286,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;
if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false;
if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false;
if (state != status.state) return false;
return true;
}
@Override
@ -331,11 +314,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);
private byte value;
private final byte value;
private boolean completed;
private final boolean completed;
private boolean failed;
private final boolean failed;
State(byte value, boolean completed, boolean failed) {
this.value = value;
@ -379,7 +362,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final List<Entry> entries;
public SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
}
@ -437,7 +419,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
builder.put(shardId, new ShardSnapshotStatus(in));
} else {
String nodeId = in.readOptionalString();
State shardState = State.fromValue(in.readByte());
ShardState shardState = ShardState.fromValue(in.readByte());
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
// Some old snapshot might still have null in shard failure reasons
String reason = shardState.failed() ? "" : null;
@ -484,7 +466,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
shardEntry.value.writeTo(out);
} else {
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value());
out.writeByte(shardEntry.value.state().value);
}
}
out.writeLong(entry.repositoryStateId);
@ -555,4 +537,52 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
builder.endArray();
builder.endObject();
}
public enum ShardState {
INIT((byte) 0, false, false),
SUCCESS((byte) 2, true, false),
FAILED((byte) 3, true, true),
ABORTED((byte) 4, false, true),
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);
private final byte value;
private final boolean completed;
private final boolean failed;
ShardState(byte value, boolean completed, boolean failed) {
this.value = value;
this.completed = completed;
this.failed = failed;
}
public boolean completed() {
return completed;
}
public boolean failed() {
return failed;
}
public static ShardState fromValue(byte value) {
switch (value) {
case 0:
return INIT;
case 2:
return SUCCESS;
case 3:
return FAILED;
case 4:
return ABORTED;
case 5:
return MISSING;
case 6:
return WAITING;
default:
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
}
}
}
}

View File

@ -729,7 +729,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse();
@ -1198,9 +1197,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
*/
boolean cancelCommittedPublication() {
synchronized (mutex) {
if (currentPublication.isPresent() && currentPublication.get().isCommitted()) {
currentPublication.get().cancel("cancelCommittedPublication");
return true;
if (currentPublication.isPresent()) {
final CoordinatorPublication publication = currentPublication.get();
if (publication.isCommitted()) {
publication.cancel("cancelCommittedPublication");
logger.debug("Cancelled publication of [{}].", publication);
return true;
}
}
return false;
}

View File

@ -110,13 +110,20 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}
final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {
ShardRouting shardRouting = null;
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
shardRouting = shard;
break;
}
}
if (shardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -35,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
@ -101,20 +101,34 @@ public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocation
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}
final ShardRouting primaryShardRouting;
try {
primaryShardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (primaryShardRouting.unassigned()) {
ShardRouting primaryShardRouting = null;
for (RoutingNode node : allocation.routingNodes()) {
for (ShardRouting shard : node) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
primaryShardRouting = shard;
break;
}
}
}
if (primaryShardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation,
"trying to allocate a replica shard [" + index + "][" + shardId +
"], while corresponding primary shard is still unassigned");
}
List<ShardRouting> replicaShardRoutings =
allocation.routingTable().shardRoutingTable(index, shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED);
List<ShardRouting> replicaShardRoutings = new ArrayList<>();
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary() == false) {
replicaShardRoutings.add(shard);
}
}
ShardRouting shardRouting;
if (replicaShardRoutings.isEmpty()) {
return explainOrThrowRejectedCommand(explain, allocation,

View File

@ -108,13 +108,20 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}
final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {
ShardRouting shardRouting = null;
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
shardRouting = shard;
break;
}
}
if (shardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.blobstore;
import java.io.Closeable;
import java.io.IOException;
/**
* An interface for storing blobs.
@ -30,10 +29,4 @@ public interface BlobStore extends Closeable {
* Get a blob container instance for storing blobs at the given {@link BlobPath}.
*/
BlobContainer blobContainer(BlobPath path);
/**
* Delete the blob store at the given {@link BlobPath}.
*/
void delete(BlobPath path) throws IOException;
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -72,16 +71,6 @@ public class FsBlobStore implements BlobStore {
}
}
@Override
public void delete(BlobPath path) throws IOException {
assert readOnly == false : "should not delete anything from a readonly repository: " + path;
//noinspection ConstantConditions in case assertions are disabled
if (readOnly) {
throw new ElasticsearchException("unexpectedly deleting [" + path + "] from a readonly repository");
}
IOUtils.rm(buildPath(path));
}
@Override
public void close() {
// nothing to do here...

View File

@ -93,7 +93,7 @@ public abstract class BaseGatewayShardAllocator {
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
*/
protected List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouting shard, RoutingAllocation allocation) {
protected static List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouting shard, RoutingAllocation allocation) {
List<NodeAllocationResult> results = new ArrayList<>();
for (RoutingNode node : allocation.routingNodes()) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
@ -163,14 +164,14 @@ public class DanglingIndicesState implements ClusterStateListener {
}
try {
allocateDangledIndices.allocateDangled(Collections.unmodifiableCollection(new ArrayList<>(danglingIndices.values())),
new LocalAllocateDangledIndices.Listener() {
new ActionListener<LocalAllocateDangledIndices.AllocateDangledResponse>() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {
logger.trace("allocated dangled");
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
logger.info("failed to send allocated dangled", e);
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import java.util.Arrays;
import java.util.function.Function;
@ -45,12 +44,9 @@ public class Gateway {
private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final int minimumMasterNodes;
private final IndicesService indicesService;
public Gateway(final Settings settings, final ClusterService clusterService,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService) {
this.indicesService = indicesService;
final TransportNodesListGatewayMetaState listGatewayMetaState) {
this.clusterService = clusterService;
this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);

View File

@ -26,14 +26,6 @@ import java.io.IOException;
public class GatewayException extends ElasticsearchException {
public GatewayException(String msg) {
super(msg);
}
public GatewayException(String msg, Throwable cause) {
super(msg, cause);
}
public GatewayException(StreamInput in) throws IOException {
super(in);
}

View File

@ -44,9 +44,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;
@ -76,11 +74,9 @@ import java.util.function.UnaryOperator;
public class GatewayMetaState implements ClusterStateApplier, CoordinationState.PersistedState {
protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final Settings settings;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final TransportService transportService;
//there is a single thread executing updateClusterState calls, hence no volatile modifier
@ -88,16 +84,13 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
protected ClusterState previousClusterState;
protected boolean incrementalWrite;
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
public GatewayMetaState(Settings settings, MetaStateService metaStateService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader,
TransportService transportService, ClusterService clusterService,
IndicesService indicesService) throws IOException {
TransportService transportService, ClusterService clusterService) throws IOException {
this.settings = settings;
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;
upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
@ -184,7 +177,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
}
protected boolean isMasterOrDataNode() {
private boolean isMasterOrDataNode() {
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
}
@ -312,13 +305,12 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
}
long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try {
long generation = metaStateService.writeManifestAndCleanup(reason, manifest);
metaStateService.writeManifestAndCleanup(reason, manifest);
commitCleanupActions.forEach(Runnable::run);
finished = true;
return generation;
} catch (WriteStateException e) {
// if Manifest write results in dirty WriteStateException it's not safe to remove
// new metadata files, because if Manifest was actually written to disk and its deletion
@ -346,7 +338,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
*
* @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}.
*/
protected void updateClusterState(ClusterState newState, ClusterState previousState)
private void updateClusterState(ClusterState newState, ClusterState previousState)
throws WriteStateException {
MetaData newMetaData = newState.metaData();
@ -406,7 +398,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
private static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode());
return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode();
}
/**
@ -535,8 +527,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<Index> relevantIndices;
relevantIndices = new HashSet<>();
Set<Index> relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices
for (IndexMetaData indexMetaData : state.metaData()) {
relevantIndices.add(indexMetaData.getIndex());

View File

@ -41,7 +41,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
@ -87,14 +86,14 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final Runnable recoveryRunnable;
private final AtomicBoolean recovered = new AtomicBoolean();
private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Inject
public GatewayService(final Settings settings, final AllocationService allocationService, final ClusterService clusterService,
final ThreadPool threadPool,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService, final Discovery discovery) {
final Discovery discovery) {
this.allocationService = allocationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
@ -125,7 +124,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
recoveryRunnable = () ->
clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
} else {
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState, indicesService);
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
recoveryRunnable = () ->
gateway.performStateRecovery(new GatewayRecoveryListener());
}
@ -215,7 +214,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
@Override
protected void doRun() {
if (recovered.compareAndSet(false, true)) {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
recoveryRunnable.run();
}
@ -223,7 +222,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
}, recoverAfterTime, ThreadPool.Names.GENERIC);
}
} else {
if (recovered.compareAndSet(false, true)) {
if (recoveryInProgress.compareAndSet(false, true)) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
@ -241,7 +240,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
}
private void resetRecoveredFlags() {
recovered.set(false);
recoveryInProgress.set(false);
scheduledRecovery.set(false);
}
@ -260,6 +259,9 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
@Override
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
}
@Override

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -76,7 +77,7 @@ public class LocalAllocateDangledIndices {
new AllocateDangledRequestHandler());
}
public void allocateDangled(Collection<IndexMetaData> indices, final Listener listener) {
public void allocateDangled(Collection<IndexMetaData> indices, ActionListener<AllocateDangledResponse> listener) {
ClusterState clusterState = clusterService.state();
DiscoveryNode masterNode = clusterState.nodes().getMasterNode();
if (masterNode == null) {
@ -110,12 +111,6 @@ public class LocalAllocateDangledIndices {
});
}
public interface Listener {
void onResponse(AllocateDangledResponse response);
void onFailure(Throwable e);
}
class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {
@Override
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel, Task task) throws Exception {
@ -257,10 +252,6 @@ public class LocalAllocateDangledIndices {
this.ack = ack;
}
public boolean ack() {
return ack;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -198,12 +198,11 @@ public class MetaStateService {
*
* @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()}
*/
public long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
public void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
logger.trace("[_meta] writing state, reason [{}]", reason);
try {
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths());
logger.trace("[_meta] state written (generation: {})", generation);
return generation;
} catch (WriteStateException ex) {
throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex);
}

View File

@ -297,10 +297,10 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/**
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
*/
private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate) {
private static NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate) {
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();

View File

@ -56,11 +56,11 @@ public abstract class PriorityComparator implements Comparator<ShardRouting> {
return cmp;
}
private int priority(Settings settings) {
private static int priority(Settings settings) {
return IndexMetaData.INDEX_PRIORITY_SETTING.get(settings);
}
private long timeCreated(Settings settings) {
private static long timeCreated(Settings settings) {
return settings.getAsLong(IndexMetaData.SETTING_CREATION_DATE, -1L);
}

View File

@ -243,8 +243,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
* YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element
* in the returned tuple.
*/
private Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
RoutingAllocation allocation) {
private static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
RoutingAllocation allocation) {
Decision madeDecision = Decision.NO;
final boolean explain = allocation.debugDecision();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
@ -260,7 +260,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
if (explain) {
madeDecision = decision;
} else {
return Tuple.tuple(decision, nodeDecisions);
return Tuple.tuple(decision, null);
}
} else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) {
madeDecision = decision;
@ -276,8 +276,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
* Takes the store info for nodes that have a shard store and adds them to the node decisions,
* leaving the node explanations untouched for those nodes that do not have any store information.
*/
private List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores) {
private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores) {
if (nodeDecisions == null || withShardStores == null) {
return null;
}
@ -295,8 +295,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
assert shard.currentNodeId() != null;
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
if (primaryNode == null) {

View File

@ -94,23 +94,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
public Request(String... nodesIds) {
super(nodesIds);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodesGatewayMetaState extends BaseNodesResponse<NodeGatewayMetaState> {
NodesGatewayMetaState() {
}
public NodesGatewayMetaState(ClusterName clusterName, List<NodeGatewayMetaState> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@ -135,15 +122,6 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
super(nodeId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodeGatewayMetaState extends BaseNodeResponse {

View File

@ -51,6 +51,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}.
@ -318,14 +319,8 @@ public class TransportNodesListGatewayStartedShards extends
NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;
if (primary != that.primary) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
return storeException != null ? storeException.equals(that.storeException) : that.storeException == null;
return primary == that.primary && Objects.equals(allocationId, that.allocationId)
&& Objects.equals(storeException, that.storeException);
}
@Override

View File

@ -911,7 +911,7 @@ public abstract class Engine implements Closeable {
map.put(extension, length);
}
if (useCompoundFile && directory != null) {
if (useCompoundFile) {
try {
directory.close();
} catch (IOException e) {
@ -954,8 +954,7 @@ public abstract class Engine implements Closeable {
// now, correlate or add the committed ones...
if (lastCommittedSegmentInfos != null) {
SegmentInfos infos = lastCommittedSegmentInfos;
for (SegmentCommitInfo info : infos) {
for (SegmentCommitInfo info : lastCommittedSegmentInfos) {
Segment segment = segments.get(info.info.name);
if (segment == null) {
segment = new Segment(info.info.name);
@ -1783,11 +1782,8 @@ public abstract class Engine implements Closeable {
CommitId commitId = (CommitId) o;
if (!Arrays.equals(id, commitId.id)) {
return false;
}
return Arrays.equals(id, commitId.id);
return true;
}
@Override

View File

@ -563,7 +563,7 @@ public class InternalEngine extends Engine {
/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(final IndexWriter writer) throws IOException {
private String loadHistoryUUID(final IndexWriter writer) {
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
@ -635,9 +635,8 @@ public class InternalEngine extends Engine {
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
final Translog.Index index = (Translog.Index) operation;
TranslogLeafReader reader = new TranslogLeafReader(index, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close),
TranslogLeafReader reader = new TranslogLeafReader(index);
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
reader, 0));
}
@ -756,7 +755,7 @@ public class InternalEngine extends Engine {
+ index.getAutoGeneratedIdTimestamp();
switch (index.origin()) {
case PRIMARY:
assertPrimaryCanOptimizeAddDocument(index);
assert assertPrimaryCanOptimizeAddDocument(index);
return true;
case PEER_RECOVERY:
case REPLICA:
@ -782,7 +781,7 @@ public class InternalEngine extends Engine {
private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assertPrimaryIncomingSequenceNumber(origin, seqNo);
assert assertPrimaryIncomingSequenceNumber(origin, seqNo);
} else {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
@ -923,7 +922,7 @@ public class InternalEngine extends Engine {
}
protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assertNonPrimaryOrigin(index);
assert assertNonPrimaryOrigin(index);
final IndexingStrategy plan;
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
@ -978,13 +977,13 @@ public class InternalEngine extends Engine {
}
}
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(1L);
plan = IndexingStrategy.overrideExistingAsIfNotThere();
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(1L);
@ -1006,7 +1005,7 @@ public class InternalEngine extends Engine {
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.id(),
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion, getPrimaryTerm());
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm()
)) {
@ -1164,9 +1163,9 @@ public class InternalEngine extends Engine {
true, false, versionForIndexing, null);
}
static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) {
static IndexingStrategy overrideExistingAsIfNotThere() {
return new IndexingStrategy(true, true, true,
false, versionForIndexing, null);
false, 1L, null);
}
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
@ -1285,7 +1284,7 @@ public class InternalEngine extends Engine {
}
protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assertNonPrimaryOrigin(delete);
assert assertNonPrimaryOrigin(delete);
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
@ -1305,7 +1304,7 @@ public class InternalEngine extends Engine {
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.version());
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version());
}
@ -1318,7 +1317,7 @@ public class InternalEngine extends Engine {
return true;
}
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
@ -1336,7 +1335,7 @@ public class InternalEngine extends Engine {
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.id(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), true);
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm()
)) {
@ -1428,8 +1427,8 @@ public class InternalEngine extends Engine {
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null);
}
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, versionOfDeletion, null);
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, null);
}
}

View File

@ -234,7 +234,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
/**
* Tracks bytes used by tombstones (deletes)
*/
final AtomicLong ramBytesUsedTombstones = new AtomicLong();
private final AtomicLong ramBytesUsedTombstones = new AtomicLong();
@Override
public void beforeRefresh() throws IOException {

View File

@ -188,8 +188,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
int readerIndex = 0;
CombinedDocValues combinedDocValues = null;
LeafReaderContext leaf = null;
for (int i = 0; i < scoreDocs.length; i++) {
ScoreDoc scoreDoc = scoreDocs[i];
for (ScoreDoc scoreDoc : scoreDocs) {
if (scoreDoc.doc >= docBase + maxDoc) {
do {
leaf = leaves.get(readerIndex++);

View File

@ -457,8 +457,8 @@ public class ReadOnlyEngine extends Engine {
}
protected void processReaders(IndexReader reader, IndexReader previousReader) {
searcherFactory.processReaders(reader, previousReader);
protected void processReader(IndexReader reader) {
searcherFactory.processReaders(reader, null);
}
@Override

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.store.Store;
import java.util.concurrent.atomic.AtomicInteger;
/**
* RecoveryCounter keeps tracks of the number of ongoing recoveries for a
* particular {@link Store}
*/
public class RecoveryCounter implements Releasable {
private final Store store;
RecoveryCounter(Store store) {
this.store = store;
}
private final AtomicInteger onGoingRecoveries = new AtomicInteger();
void startRecovery() {
store.incRef();
onGoingRecoveries.incrementAndGet();
}
public int get() {
return onGoingRecoveries.get();
}
/**
* End the recovery counter by decrementing the store's ref and the ongoing recovery counter
* @return number of ongoing recoveries remaining
*/
int endRecovery() {
store.decRef();
int left = onGoingRecoveries.decrementAndGet();
assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get();
return left;
}
@Override
public void close() {
endRecovery();
}
}

View File

@ -58,8 +58,7 @@ final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
});
}
// pkg private for testing
static CodecReader wrapReader(String recoverySourceField, CodecReader reader, Supplier<Query> retainSourceQuerySupplier)
private static CodecReader wrapReader(String recoverySourceField, CodecReader reader, Supplier<Query> retainSourceQuerySupplier)
throws IOException {
NumericDocValues recoverySource = reader.getNumericDocValues(recoverySourceField);
if (recoverySource == null || recoverySource.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {

View File

@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class Segment implements Streamable {
@ -94,10 +95,6 @@ public class Segment implements Streamable {
return new ByteSizeValue(sizeInBytes);
}
public long getSizeInBytes() {
return this.sizeInBytes;
}
public org.apache.lucene.util.Version getVersion() {
return version;
}
@ -145,9 +142,8 @@ public class Segment implements Streamable {
Segment segment = (Segment) o;
if (name != null ? !name.equals(segment.name) : segment.name != null) return false;
return Objects.equals(name, segment.name);
return true;
}
@Override
@ -220,7 +216,7 @@ public class Segment implements Streamable {
}
}
Sort readSegmentSort(StreamInput in) throws IOException {
private Sort readSegmentSort(StreamInput in) throws IOException {
int size = in.readVInt();
if (size == 0) {
return null;
@ -271,7 +267,7 @@ public class Segment implements Streamable {
return new Sort(fields);
}
void writeSegmentSort(StreamOutput out, Sort sort) throws IOException {
private void writeSegmentSort(StreamOutput out, Sort sort) throws IOException {
if (sort == null) {
out.writeVInt(0);
return;
@ -311,14 +307,14 @@ public class Segment implements Streamable {
}
}
Accountable readRamTree(StreamInput in) throws IOException {
private Accountable readRamTree(StreamInput in) throws IOException {
final String name = in.readString();
final long bytes = in.readVLong();
int numChildren = in.readVInt();
if (numChildren == 0) {
return Accountables.namedAccountable(name, bytes);
}
List<Accountable> children = new ArrayList(numChildren);
List<Accountable> children = new ArrayList<>(numChildren);
while (numChildren-- > 0) {
children.add(readRamTree(in));
}
@ -326,7 +322,7 @@ public class Segment implements Streamable {
}
// the ram tree is written recursively since the depth is fairly low (5 or 6)
void writeRamTree(StreamOutput out, Accountable tree) throws IOException {
private void writeRamTree(StreamOutput out, Accountable tree) throws IOException {
out.writeString(tree.toString());
out.writeVLong(tree.ramBytesUsed());
Collection<Accountable> children = tree.getChildResources();

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Iterator;
public class SegmentsStats implements Streamable, Writeable, ToXContentFragment {
@ -54,7 +53,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
* Ideally this should be in sync to what the current version of Lucene is using, but it's harmless to leave extensions out,
* they'll just miss a proper description in the stats
*/
private static ImmutableOpenMap<String, String> fileDescriptions = ImmutableOpenMap.<String, String>builder()
private static final ImmutableOpenMap<String, String> FILE_DESCRIPTIONS = ImmutableOpenMap.<String, String>builder()
.fPut("si", "Segment Info")
.fPut("fnm", "Fields")
.fPut("fdx", "Field Index")
@ -150,8 +149,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
public void addFileSizes(ImmutableOpenMap<String, Long> fileSizes) {
ImmutableOpenMap.Builder<String, Long> map = ImmutableOpenMap.builder(this.fileSizes);
for (Iterator<ObjectObjectCursor<String, Long>> it = fileSizes.iterator(); it.hasNext();) {
ObjectObjectCursor<String, Long> entry = it.next();
for (ObjectObjectCursor<String, Long> entry : fileSizes) {
if (map.containsKey(entry.key)) {
Long oldValue = map.get(entry.key);
map.put(entry.key, oldValue + entry.value);
@ -206,7 +204,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.termsMemoryInBytes;
}
public ByteSizeValue getTermsMemory() {
private ByteSizeValue getTermsMemory() {
return new ByteSizeValue(termsMemoryInBytes);
}
@ -217,7 +215,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.storedFieldsMemoryInBytes;
}
public ByteSizeValue getStoredFieldsMemory() {
private ByteSizeValue getStoredFieldsMemory() {
return new ByteSizeValue(storedFieldsMemoryInBytes);
}
@ -228,7 +226,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.termVectorsMemoryInBytes;
}
public ByteSizeValue getTermVectorsMemory() {
private ByteSizeValue getTermVectorsMemory() {
return new ByteSizeValue(termVectorsMemoryInBytes);
}
@ -239,7 +237,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.normsMemoryInBytes;
}
public ByteSizeValue getNormsMemory() {
private ByteSizeValue getNormsMemory() {
return new ByteSizeValue(normsMemoryInBytes);
}
@ -250,7 +248,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.pointsMemoryInBytes;
}
public ByteSizeValue getPointsMemory() {
private ByteSizeValue getPointsMemory() {
return new ByteSizeValue(pointsMemoryInBytes);
}
@ -261,7 +259,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
return this.docValuesMemoryInBytes;
}
public ByteSizeValue getDocValuesMemory() {
private ByteSizeValue getDocValuesMemory() {
return new ByteSizeValue(docValuesMemoryInBytes);
}
@ -326,11 +324,10 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory());
builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp);
builder.startObject(Fields.FILE_SIZES);
for (Iterator<ObjectObjectCursor<String, Long>> it = fileSizes.iterator(); it.hasNext();) {
ObjectObjectCursor<String, Long> entry = it.next();
for (ObjectObjectCursor<String, Long> entry : fileSizes) {
builder.startObject(entry.key);
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, new ByteSizeValue(entry.value));
builder.field(Fields.DESCRIPTION, fileDescriptions.getOrDefault(entry.key, "Others"));
builder.field(Fields.DESCRIPTION, FILE_DESCRIPTIONS.getOrDefault(entry.key, "Others"));
builder.endObject();
}
builder.endObject();
@ -391,7 +388,7 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
out.writeVInt(fileSizes.size());
for (ObjectObjectCursor<String, Long> entry : fileSizes) {
out.writeString(entry.key);
out.writeLong(entry.value.longValue());
out.writeLong(entry.value);
}
}

View File

@ -20,17 +20,12 @@
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
public class SnapshotFailedEngineException extends EngineException {
public SnapshotFailedEngineException(ShardId shardId, Throwable cause) {
super(shardId, "Snapshot failed", cause);
}
public SnapshotFailedEngineException(StreamInput in) throws IOException{
super(in);
}
}
}

Some files were not shown because too many files have changed in this diff Show More