[SNAPSHOT] Add repository validation

Fixes #7096
This commit is contained in:
Igor Motov 2014-09-10 21:49:15 -04:00
parent 09ff3724ee
commit 555bfcb02b
40 changed files with 1398 additions and 44 deletions

View File

@ -71,7 +71,7 @@ on all data and master nodes. The following settings are supported:
using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size).
`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second.
`max_snapshot_bytes_per_sec`:: Throttles per node snapshot rate. Defaults to `20mb` per second.
`verify`:: Verify repository upon creation. Defaults to `true`.
[float]
===== Read-only URL Repository
@ -92,6 +92,21 @@ Other repository backends are available in these official plugins:
* https://github.com/elasticsearch/elasticsearch-hadoop/tree/master/repository-hdfs[HDFS Plugin] for Hadoop environments
* https://github.com/elasticsearch/elasticsearch-cloud-azure#azure-repository[Azure Cloud Plugin] for Azure storage repositories
===== Repository Verification
added[1.4.0]
When repository is registered, it's immediately verified on all master and data nodes to make sure that it's functional
on all nodes currently present in the cluster. The verification process can also be executed manually by running the
following command:
[source,js]
-----------------------------------
$ curl -XPOST 'http://localhost:9200/_snapshot/my_backup/_verify'
-----------------------------------
It returns a list of nodes
[float]
=== Snapshot

View File

@ -20,6 +20,10 @@
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"verify": {
"type" : "boolean",
"description" : "Whether to verify the repository after creation"
}
}
},

View File

@ -0,0 +1,28 @@
{
"snapshot.verify_repository": {
"documentation": "http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html",
"methods": ["POST"],
"url": {
"path": "/_snapshot/{repository}/_verify",
"paths": ["/_snapshot/{repository}/_verify"],
"parts": {
"repository": {
"type": "string",
"required" : true,
"description": "A repository name"
}
},
"params": {
"master_timeout": {
"type" : "time",
"description" : "Explicit operation timeout for connection to master node"
},
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
}
}
},
"body": null
}
}

View File

@ -48,3 +48,11 @@ setup:
- is_true: test_repo1
- is_true: test_repo2
---
"Verify created repository":
- do:
snapshot.verify_repository:
repository: test_repo2
- is_true: nodes

View File

@ -39,6 +39,8 @@ import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAc
import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.verify.TransportVerifyRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
@ -238,6 +240,7 @@ public class ActionModule extends AbstractModule {
registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.repositories.put;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesReference;
@ -52,6 +53,8 @@ public class PutRepositoryRequest extends AcknowledgedRequest<PutRepositoryReque
private String type;
private boolean verify = true;
private Settings settings = EMPTY_SETTINGS;
PutRepositoryRequest() {
@ -178,6 +181,20 @@ public class PutRepositoryRequest extends AcknowledgedRequest<PutRepositoryReque
return this.settings;
}
/**
* Sets whether or not the repository should be verified after creation
*/
public PutRepositoryRequest verify(boolean verify) {
this.verify = verify;
return this;
}
/**
* Returns true if repository should be verified after creation
*/
public boolean verify() {
return this.verify;
}
/**
* Parses repository definition.
@ -268,6 +285,12 @@ public class PutRepositoryRequest extends AcknowledgedRequest<PutRepositoryReque
type = in.readString();
settings = readSettingsFromStream(in);
readTimeout(in);
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
verify = in.readBoolean();
} else {
// we received this request from an older client that doesn't expect us to validate the request
verify = false;
}
}
@Override
@ -277,5 +300,8 @@ public class PutRepositoryRequest extends AcknowledgedRequest<PutRepositoryReque
out.writeString(type);
writeSettingsToStream(settings, out);
writeTimeout(out);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(verify);
}
}
}

View File

@ -116,6 +116,17 @@ public class PutRepositoryRequestBuilder extends AcknowledgedRequestBuilder<PutR
return this;
}
/**
* Sets whether or not repository should be verified after creation
*
* @param verify true if repository should be verified after registration, false otherwise
* @return this builder
*/
public PutRepositoryRequestBuilder setVerify(boolean verify) {
request.verify(verify);
return this;
}
@Override
protected void doExecute(ActionListener<PutRepositoryResponse> listener) {
client.putRepository(request, listener);

View File

@ -71,7 +71,9 @@ public class TransportPutRepositoryAction extends TransportMasterNodeOperationAc
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state, final ActionListener<PutRepositoryResponse> listener) throws ElasticsearchException {
repositoriesService.registerRepository(new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]", request.name(), request.type())
repositoriesService.registerRepository(
new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]",
request.name(), request.type(), request.verify())
.settings(request.settings())
.masterNodeTimeout(request.masterNodeTimeout())
.ackTimeout(request.timeout()), new ActionListener<ClusterStateUpdateResponse>() {

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.verify;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* Transport action for verifying repository operation
*/
public class TransportVerifyRepositoryAction extends TransportMasterNodeOperationAction<VerifyRepositoryRequest, VerifyRepositoryResponse> {
private final RepositoriesService repositoriesService;
protected final ClusterName clusterName;
@Inject
public TransportVerifyRepositoryAction(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, VerifyRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters);
this.repositoriesService = repositoriesService;
this.clusterName = clusterName;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected VerifyRepositoryRequest newRequest() {
return new VerifyRepositoryRequest();
}
@Override
protected VerifyRepositoryResponse newResponse() {
return new VerifyRepositoryResponse();
}
@Override
protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, "");
}
@Override
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state, final ActionListener<VerifyRepositoryResponse> listener) throws ElasticsearchException {
repositoriesService.verifyRepository(request.name(), new ActionListener<RepositoriesService.VerifyResponse>() {
@Override
public void onResponse(RepositoriesService.VerifyResponse verifyResponse) {
if (verifyResponse.failed()) {
listener.onFailure(new RepositoryVerificationException(request.name(), verifyResponse.failureDescription()));
} else {
listener.onResponse(new VerifyRepositoryResponse(clusterName, verifyResponse.nodes()));
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.verify;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.client.ClusterAdminClient;
/**
* Unregister repository action
*/
public class VerifyRepositoryAction extends ClusterAction<VerifyRepositoryRequest, VerifyRepositoryResponse, VerifyRepositoryRequestBuilder> {
public static final VerifyRepositoryAction INSTANCE = new VerifyRepositoryAction();
public static final String NAME = "cluster:admin/repository/verify";
private VerifyRepositoryAction() {
super(NAME);
}
@Override
public VerifyRepositoryResponse newResponse() {
return new VerifyRepositoryResponse();
}
@Override
public VerifyRepositoryRequestBuilder newRequestBuilder(ClusterAdminClient client) {
return new VerifyRepositoryRequestBuilder(client);
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.verify;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Unregister repository request.
* <p/>
* The unregister repository command just unregisters the repository. No data is getting deleted from the repository.
*/
public class VerifyRepositoryRequest extends AcknowledgedRequest<VerifyRepositoryRequest> {
private String name;
VerifyRepositoryRequest() {
}
/**
* Constructs a new unregister repository request with the provided name.
*
* @param name name of the repository
*/
public VerifyRepositoryRequest(String name) {
this.name = name;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (name == null) {
validationException = addValidationError("name is missing", validationException);
}
return validationException;
}
/**
* Sets the name of the repository to unregister.
*
* @param name name of the repository
*/
public VerifyRepositoryRequest name(String name) {
this.name = name;
return this;
}
/**
* The name of the repository.
*
* @return the name of the repository
*/
public String name() {
return this.name;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
name = in.readString();
readTimeout(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
writeTimeout(out);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.verify;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
/**
* Builder for unregister repository request
*/
public class VerifyRepositoryRequestBuilder extends MasterNodeOperationRequestBuilder<VerifyRepositoryRequest, VerifyRepositoryResponse, VerifyRepositoryRequestBuilder, ClusterAdminClient> {
/**
* Constructs unregister repository request builder
*
* @param clusterAdminClient cluster admin client
*/
public VerifyRepositoryRequestBuilder(ClusterAdminClient clusterAdminClient) {
super(clusterAdminClient, new VerifyRepositoryRequest());
}
/**
* Constructs unregister repository request builder with specified repository name
*
* @param clusterAdminClient cluster adming client
*/
public VerifyRepositoryRequestBuilder(ClusterAdminClient clusterAdminClient, String name) {
super(clusterAdminClient, new VerifyRepositoryRequest(name));
}
/**
* Sets the repository name
*
* @param name the repository name
*/
public VerifyRepositoryRequestBuilder setName(String name) {
request.name(name);
return this;
}
@Override
protected void doExecute(ActionListener<VerifyRepositoryResponse> listener) {
client.verifyRepository(request, listener);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.verify;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.repositories.VerificationFailure;
import java.io.IOException;
import java.util.Arrays;
/**
* Unregister repository response
*/
public class VerifyRepositoryResponse extends ActionResponse implements ToXContent {
private DiscoveryNode[] nodes;
private ClusterName clusterName;
VerifyRepositoryResponse() {
}
public VerifyRepositoryResponse(ClusterName clusterName, DiscoveryNode[] nodes) {
this.clusterName = clusterName;
this.nodes = nodes;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
nodes = new DiscoveryNode[in.readVInt()];
for (int i=0; i<nodes.length; i++){
nodes[i] = DiscoveryNode.readNode(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
out.writeVInt(nodes.length);
for (DiscoveryNode node : nodes) {
node.writeTo(out);
}
}
public DiscoveryNode[] getNodes() {
return nodes;
}
public ClusterName getClusterName() {
return clusterName;
}
static final class Fields {
static final XContentBuilderString NODES = new XContentBuilderString("nodes");
static final XContentBuilderString NAME = new XContentBuilderString("name");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.NODES);
for (DiscoveryNode node : nodes) {
builder.startObject(node.id(), XContentBuilder.FieldCaseConversion.NONE);
builder.field(Fields.NAME, node.name(), XContentBuilder.FieldCaseConversion.NONE);
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
public String toString() {
return XContentHelper.toString(this);
}
}

View File

@ -47,6 +47,9 @@ import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRe
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
@ -359,6 +362,21 @@ public interface ClusterAdminClient extends ElasticsearchClient<ClusterAdminClie
*/
GetRepositoriesRequestBuilder prepareGetRepositories(String... name);
/**
* Verifies a repository.
*/
ActionFuture<VerifyRepositoryResponse> verifyRepository(VerifyRepositoryRequest request);
/**
* Verifies a repository.
*/
void verifyRepository(VerifyRepositoryRequest request, ActionListener<VerifyRepositoryResponse> listener);
/**
* Verifies a repository.
*/
VerifyRepositoryRequestBuilder prepareVerifyRepository(String name);
/**
* Creates a new snapshot.
*/

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
@ -501,6 +502,16 @@ public class Requests {
return new DeleteRepositoryRequest(name);
}
/**
* Verifies snapshot repository
*
* @param name repository name
* @return repository verification request
*/
public static VerifyRepositoryRequest verifyRepositoryRequest(String name) {
return new VerifyRepositoryRequest(name);
}
/**
* Creates new snapshot

View File

@ -57,6 +57,10 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryActi
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
@ -374,6 +378,21 @@ public abstract class AbstractClusterAdminClient implements ClusterAdminClient {
return new DeleteRepositoryRequestBuilder(this, name);
}
@Override
public ActionFuture<VerifyRepositoryResponse> verifyRepository(VerifyRepositoryRequest request) {
return execute(VerifyRepositoryAction.INSTANCE, request);
}
@Override
public void verifyRepository(VerifyRepositoryRequest request, ActionListener<VerifyRepositoryResponse> listener) {
execute(VerifyRepositoryAction.INSTANCE, request, listener);
}
@Override
public VerifyRepositoryRequestBuilder prepareVerifyRepository(String name) {
return new VerifyRepositoryRequestBuilder(this, name);
}
@Override
public ActionFuture<GetRepositoriesResponse> getRepositories(GetRepositoriesRequest request) {
return execute(GetRepositoriesAction.INSTANCE, request);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.xcontent;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -30,12 +31,15 @@ import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
/**
*
*/
@ -181,13 +185,52 @@ public class XContentHelper {
}
}
/**
* Writes serialized toXContent to pretty-printed JSON string.
*
* @param toXContent object to be pretty printed
* @return pretty-printed JSON serialization
*/
public static String toString(ToXContent toXContent) {
return toString(toXContent, EMPTY_PARAMS);
}
/**
* Writes serialized toXContent to pretty-printed JSON string.
*
* @param toXContent object to be pretty printed
* @param params serialization parameters
* @return pretty-printed JSON serialization
*/
public static String toString(ToXContent toXContent, Params params) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent.toXContent(builder, params);
builder.endObject();
return builder.string();
} catch (IOException e) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
builder.field("error", e.getMessage());
builder.endObject();
return builder.string();
} catch (IOException e2) {
throw new ElasticsearchException("cannot generate error message for deserialization", e);
}
}
}
/**
* Updates the provided changes into the source. If the key exists in the changes, it overrides the one in source
* unless both are Maps, in which case it recuersively updated it.
* @param source the original map to be updated
* @param changes the changes to update into updated
*
* @param source the original map to be updated
* @param changes the changes to update into updated
* @param checkUpdatesAreUnequal should this method check if updates to the same key (that are not both maps) are
* unequal? This is just a .equals check on the objects, but that can take some time on long strings.
* unequal? This is just a .equals check on the objects, but that can take some time on long strings.
* @return true if the source map was modified
*/
public static boolean update(Map<String, Object> source, Map<String, Object> changes, boolean checkUpdatesAreUnequal) {

View File

@ -69,4 +69,10 @@ public interface IndexShardRepository {
*/
IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId);
/**
* Verifies repository settings on data node
* @param verificationToken value returned by {@link org.elasticsearch.repositories.Repository#startVerification()}
*/
void verify(String verificationToken);
}

View File

@ -28,7 +28,10 @@ import org.apache.lucene.store.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -47,15 +50,14 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositoryVerificationException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix;
/**
* Blob store based implementation of IndexShardRepository
@ -73,6 +75,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private final IndicesService indicesService;
private final ClusterService clusterService;
private RateLimiter snapshotRateLimiter;
private RateLimiter restoreRateLimiter;
@ -84,10 +88,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private static final String SNAPSHOT_PREFIX = "snapshot-";
@Inject
BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService) {
public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings);
this.repositoryName = repositoryName.name();
this.indicesService = indicesService;
this.clusterService = clusterService;
}
/**
@ -170,6 +175,21 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
return status;
}
@Override
public void verify(String seed) {
BlobContainer testBlobContainer = blobStore.blobContainer(basePath);;
DiscoveryNode localNode = clusterService.localNode();
if (testBlobContainer.blobExists(testBlobPrefix(seed) + "-master")) {
try (OutputStream outputStream = testBlobContainer.createOutput(testBlobPrefix(seed) + "-" + localNode.getId())) {
outputStream.write(Strings.toUTF8Bytes(seed));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not accessible on the node [" + localNode + "]", exp);
}
} else {
throw new RepositoryVerificationException(repositoryName, "store location [" + blobStore + "] is not shared between node [" + localNode + "] and the master node");
}
}
/**
* Delete shard snapshot
*

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
@ -39,6 +40,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
@ -58,10 +60,12 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
private final ClusterService clusterService;
private final VerifyNodeRepositoryAction verifyAction;
private volatile ImmutableMap<String, RepositoryHolder> repositories = ImmutableMap.of();
@Inject
public RepositoriesService(Settings settings, ClusterService clusterService, RepositoryTypesRegistry typesRegistry, Injector injector) {
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService, RepositoryTypesRegistry typesRegistry, Injector injector) {
super(settings);
this.typesRegistry = typesRegistry;
this.injector = injector;
@ -71,6 +75,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
if (DiscoveryNode.dataNode(settings) || DiscoveryNode.masterNode(settings)) {
clusterService.add(this);
}
this.verifyAction = new VerifyNodeRepositoryAction(settings, transportService, clusterService, this);
}
/**
@ -85,7 +90,14 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
public void registerRepository(final RegisterRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name, request.type, request.settings);
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
final ActionListener<ClusterStateUpdateResponse> registrationListener;
if (request.verify) {
registrationListener = new VerifyingRegisterRepositoryListener(request.name, listener);
} else {
registrationListener = listener;
}
clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, registrationListener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
@ -141,7 +153,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
});
}
/**
* Unregisters repository in the cluster
* <p/>
@ -191,6 +202,47 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
});
}
public void verifyRepository(final String repositoryName, final ActionListener<VerifyResponse> listener) {
final Repository repository = repository(repositoryName);
try {
final String verificationToken = repository.startVerification();
if (verificationToken != null) {
try {
verifyAction.verify(repositoryName, verificationToken, new ActionListener<VerifyResponse>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
try {
repository.endVerification(verificationToken);
} catch (Throwable t) {
logger.warn("[{}] failed to finish repository verification", repositoryName, t);
listener.onFailure(t);
return;
}
listener.onResponse(verifyResponse);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} catch (Throwable t) {
try {
repository.endVerification(verificationToken);
} catch (Throwable t1) {
logger.warn("[{}] failed to finish repository verification", repositoryName, t);
}
listener.onFailure(t);
}
} else {
listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0]));
}
} catch (Throwable t) {
listener.onFailure(t);
}
}
/**
* Checks if new repositories appeared in or disappeared from cluster metadata and updates current list of
* repositories accordingly.
@ -360,6 +412,47 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
}
private class VerifyingRegisterRepositoryListener implements ActionListener<ClusterStateUpdateResponse> {
private final String name;
private final ActionListener<ClusterStateUpdateResponse> listener;
public VerifyingRegisterRepositoryListener(String name, final ActionListener<ClusterStateUpdateResponse> listener) {
this.name = name;
this.listener = listener;
}
@Override
public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) {
if (clusterStateUpdateResponse.isAcknowledged()) {
// The response was acknowledged - all nodes should know about the new repository, let's verify them
verifyRepository(name, new ActionListener<VerifyResponse>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
if (verifyResponse.failed()) {
listener.onFailure(new RepositoryVerificationException(name, verifyResponse.failureDescription()));
} else {
listener.onResponse(clusterStateUpdateResponse);
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} else {
listener.onResponse(clusterStateUpdateResponse);
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
}
/**
* Internal data structure for holding repository with its configuration information and injector
*/
@ -391,6 +484,8 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
final String type;
final boolean verify;
Settings settings = EMPTY_SETTINGS;
/**
@ -399,11 +494,13 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* @param cause repository registration cause
* @param name repository name
* @param type repository type
* @param verify verify repository after creation
*/
public RegisterRepositoryRequest(String cause, String name, String type) {
public RegisterRepositoryRequest(String cause, String name, String type, boolean verify) {
this.cause = cause;
this.name = name;
this.type = type;
this.verify = verify;
}
/**
@ -439,4 +536,39 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
}
/**
* Verify repository request
*/
public static class VerifyResponse {
private VerificationFailure[] failures;
private DiscoveryNode[] nodes;
public VerifyResponse(DiscoveryNode[] nodes, VerificationFailure[] failures) {
this.nodes = nodes;
this.failures = failures;
}
public VerificationFailure[] failures() {
return failures;
}
public DiscoveryNode[] nodes() {
return nodes;
}
public boolean failed() {
return failures.length > 0;
}
public String failureDescription() {
StringBuilder builder = new StringBuilder('[');
Joiner.on(", ").appendTo(builder, failures);
return builder.append(']').toString();
}
}
}

View File

@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotShardFailure;
@ -109,4 +111,23 @@ public interface Repository extends LifecycleComponent<Repository> {
long restoreThrottleTimeInNanos();
/**
* Verifies repository on the master node and returns the verification token.
*
* If the verification token is not null, it's passed to all data nodes for verification. If it's null - no
* additional verification is required
*
* @return verification token that should be passed to all Index Shard Repositories for additional verification or null
*/
String startVerification();
/**
* Called at the end of repository verification process.
*
* This method should perform all necessary cleanup of the temporary files created in the repository
*
* @param verificationToken verification request generated by {@link #startVerification} command
*/
void endVerification(String verificationToken);
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.rest.RestStatus;
/**
* Repository verification exception
*/
public class RepositoryVerificationException extends RepositoryException {
public RepositoryVerificationException(String repository, String msg) {
super(repository, msg);
}
public RepositoryVerificationException(String repository, String msg, Throwable t) {
super(repository, msg, t);
}
@Override
public RestStatus status() {
return RestStatus.INTERNAL_SERVER_ERROR;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
*/
public class VerificationFailure implements Streamable {
private String nodeId;
private String cause;
VerificationFailure() {
}
public VerificationFailure(String nodeId, String cause) {
this.nodeId = nodeId;
this.cause = cause;
}
public String nodeId() {
return nodeId;
}
public String cause() {
return cause;
}
@Override
public void readFrom(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
cause = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeOptionalString(cause);
}
public static VerificationFailure readNode(StreamInput in) throws IOException {
VerificationFailure failure = new VerificationFailure();
failure.readFrom(in);
return failure;
}
@Override
public String toString() {
return "[" + nodeId + ", '" + cause + "']";
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.repositories.RepositoriesService.VerifyResponse;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
public class VerifyNodeRepositoryAction extends AbstractComponent {
public static final String ACTION_NAME = "internal:admin/repository/verify";
private final TransportService transportService;
private final ClusterService clusterService;
private final RepositoriesService repositoriesService;
public VerifyNodeRepositoryAction(Settings settings, TransportService transportService, ClusterService clusterService, RepositoriesService repositoriesService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
transportService.registerHandler(ACTION_NAME, new VerifyNodeRepositoryRequestHandler());
}
public void close() {
transportService.removeHandler(ACTION_NAME);
}
public void verify(String repository, String verificationToken, final ActionListener<VerifyResponse> listener) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.localNode();
final ObjectContainer<DiscoveryNode> masterAndDataNodes = discoNodes.masterAndDataNodes().values();
final List<DiscoveryNode> nodes = newArrayList();
for (ObjectCursor<DiscoveryNode> cursor : masterAndDataNodes) {
DiscoveryNode node = cursor.value;
Version version = node.getVersion();
// Verification wasn't supported before v1.4.0 - no reason to send verification request to these nodes
if (version != null && version.onOrAfter(Version.V_1_4_0)) {
nodes.add(node);
}
}
final CopyOnWriteArrayList<VerificationFailure> errors = new CopyOnWriteArrayList<>();
final AtomicInteger counter = new AtomicInteger(nodes.size());
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode)) {
try {
doVerify(repository, verificationToken);
} catch (Throwable t) {
errors.add(new VerificationFailure(node.id(), ExceptionsHelper.detailedMessage(t)));
}
if (counter.decrementAndGet() == 0) {
finishVerification(listener, nodes, errors);
}
} else {
transportService.sendRequest(node, ACTION_NAME, new VerifyNodeRepositoryRequest(repository, verificationToken), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
if (counter.decrementAndGet() == 0) {
finishVerification(listener, nodes, errors);
}
}
@Override
public void handleException(TransportException exp) {
errors.add(new VerificationFailure(node.id(), ExceptionsHelper.detailedMessage(exp)));
if (counter.decrementAndGet() == 0) {
finishVerification(listener, nodes, errors);
}
}
});
}
}
}
public void finishVerification(ActionListener<VerifyResponse> listener, List<DiscoveryNode> nodes, CopyOnWriteArrayList<VerificationFailure> errors) {
listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]), errors.toArray(new VerificationFailure[errors.size()])));
}
private void doVerify(String repository, String verificationToken) {
IndexShardRepository blobStoreIndexShardRepository = repositoriesService.indexShardRepository(repository);
blobStoreIndexShardRepository.verify(verificationToken);
}
private class VerifyNodeRepositoryRequest extends TransportRequest {
private String repository;
private String verificationToken;
private VerifyNodeRepositoryRequest() {
}
private VerifyNodeRepositoryRequest(String repository, String verificationToken) {
this.repository = repository;
this.verificationToken = verificationToken;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
repository = in.readString();
verificationToken = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
out.writeString(verificationToken);
}
}
private class VerifyNodeRepositoryRequestHandler extends BaseTransportRequestHandler<VerifyNodeRepositoryRequest> {
@Override
public VerifyNodeRepositoryRequest newInstance() {
return new VerifyNodeRepositoryRequest();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel) throws Exception {
doVerify(request.repository, request.verificationToken);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -52,6 +53,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.*;
import java.io.FileNotFoundException;
@ -117,6 +119,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
private static final String SNAPSHOTS_FILE = "index";
private static final String TESTS_FILE = "tests-";
private static final String METADATA_PREFIX = "metadata-";
private final BlobStoreIndexShardRepository indexShardRepository;
@ -397,7 +401,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
String blobName = snapshotBlobName(snapshotId);
int retryCount = 0;
while (true) {
try (InputStream blob = snapshotsBlobContainer.openInput(blobName)){
try (InputStream blob = snapshotsBlobContainer.openInput(blobName)) {
byte[] data = ByteStreams.toByteArray(blob);
// Because we are overriding snapshot during finalization, it's possible that
// we can get an empty or incomplete snapshot for a brief moment
@ -427,7 +431,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
private MetaData readSnapshotMetaData(SnapshotId snapshotId, ImmutableList<String> indices, boolean ignoreIndexErrors) {
MetaData metaData;
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId))){
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId))) {
byte[] data = ByteStreams.toByteArray(blob);
metaData = readMetaData(data);
} catch (FileNotFoundException | NoSuchFileException ex) {
@ -580,7 +584,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @throws IOException
*/
private void writeGlobalMetaData(MetaData metaData, OutputStream outputStream) throws IOException {
StreamOutput stream = new OutputStreamStreamOutput(outputStream) ;
StreamOutput stream = new OutputStreamStreamOutput(outputStream);
if (isCompress()) {
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
@ -629,7 +633,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @throws IOException I/O errors
*/
protected ImmutableList<SnapshotId> readSnapshotList() throws IOException {
try (InputStream blob = snapshotsBlobContainer.openInput(SNAPSHOTS_FILE)){
try (InputStream blob = snapshotsBlobContainer.openInput(SNAPSHOTS_FILE)) {
final byte[] data = ByteStreams.toByteArray(blob);
ArrayList<SnapshotId> snapshots = new ArrayList<>();
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
@ -669,4 +673,31 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
public long restoreThrottleTimeInNanos() {
return restoreRateLimitingTimeInNanos.count();
}
@Override
public String startVerification() {
try {
String seed = Strings.randomBase64UUID();
byte[] testBytes = Strings.toUTF8Bytes(seed);
try (OutputStream outputStream = snapshotsBlobContainer.createOutput(testBlobPrefix(seed) + "-master")) {
outputStream.write(testBytes);
}
return seed;
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "path " + basePath() + " is not accessible on master node", exp);
}
}
@Override
public void endVerification(String seed) {
try {
snapshotsBlobContainer.deleteBlobsByPrefix(testBlobPrefix(seed));
} catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "cannot delete test data at " + basePath(), exp);
}
}
public static String testBlobPrefix(String seed) {
return TESTS_FILE + seed;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.uri;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.RepositoryName;
/**
*/
public class URLIndexShardRepository extends BlobStoreIndexShardRepository {
@Inject
URLIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings, repositoryName, indicesService, clusterService);
}
@Override
public void verify(String seed) {
//TODO: Add verification that URL is accessible
}
}

View File

@ -102,4 +102,16 @@ public class URLRepository extends BlobStoreRepository {
}
}
}
@Override
public String startVerification() {
//TODO: #7831 Add check that URL exists and accessible
return null;
}
@Override
public void endVerification(String seed) {
throw new UnsupportedOperationException("shouldn't be called");
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.exists.RestExistsAction;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
@ -146,6 +147,7 @@ public class RestActionModule extends AbstractModule {
bind(RestPutRepositoryAction.class).asEagerSingleton();
bind(RestGetRepositoriesAction.class).asEagerSingleton();
bind(RestDeleteRepositoryAction.class).asEagerSingleton();
bind(RestVerifyRepositoryAction.class).asEagerSingleton();
bind(RestGetSnapshotsAction.class).asEagerSingleton();
bind(RestCreateSnapshotAction.class).asEagerSingleton();
bind(RestRestoreSnapshotAction.class).asEagerSingleton();

View File

@ -49,6 +49,7 @@ public class RestPutRepositoryAction extends BaseRestHandler {
PutRepositoryRequest putRepositoryRequest = putRepositoryRequest(request.param("repository"));
putRepositoryRequest.listenerThreaded(false);
putRepositoryRequest.source(request.content().toUtf8());
putRepositoryRequest.verify(request.paramAsBoolean("verify", true));
putRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRepositoryRequest.masterNodeTimeout()));
putRepositoryRequest.timeout(request.paramAsTime("timeout", putRepositoryRequest.timeout()));
client.admin().cluster().putRepository(putRepositoryRequest, new AcknowledgedRestListener<PutRepositoryResponse>(channel));

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.repositories.verify;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import static org.elasticsearch.client.Requests.verifyRepositoryRequest;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
/**
* Registers repositories
*/
public class RestVerifyRepositoryAction extends BaseRestHandler {
@Inject
public RestVerifyRepositoryAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(POST, "/_snapshot/{repository}/_verify", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
VerifyRepositoryRequest verifyRepositoryRequest = verifyRepositoryRequest(request.param("repository"));
verifyRepositoryRequest.listenerThreaded(false);
verifyRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", verifyRepositoryRequest.masterNodeTimeout()));
verifyRepositoryRequest.timeout(request.paramAsTime("timeout", verifyRepositoryRequest.timeout()));
client.admin().cluster().verifyRepository(verifyRepositoryRequest, new RestToXContentListener<VerifyRepositoryResponse>(channel));
}
}

View File

@ -23,17 +23,24 @@ import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryException;
import org.junit.After;
import org.junit.Before;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
import org.junit.Test;
import java.io.File;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
@ -45,13 +52,23 @@ public class RepositoriesTests extends AbstractSnapshotTests {
public void testRepositoryCreation() throws Exception {
Client client = client();
File location = newTempDir(LifecycleScope.SUITE);
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-1")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("location", location)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> verify the repository");
int numberOfFiles = location.listFiles().length;
VerifyRepositoryResponse verifyRepositoryResponse = client.admin().cluster().prepareVerifyRepository("test-repo-1").get();
assertThat(verifyRepositoryResponse.getNodes().length, equalTo(cluster().numDataAndMasterNodes()));
logger.info("--> verify that we didn't leave any files as a result of verification");
assertThat(location.listFiles().length, equalTo(numberOfFiles));
logger.info("--> check that repository is really there");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().clear().setMetaData(true).get();
MetaData metaData = clusterStateResponse.getState().getMetaData();
@ -60,10 +77,10 @@ public class RepositoriesTests extends AbstractSnapshotTests {
assertThat(repositoriesMetaData.repository("test-repo-1"), notNullValue());
assertThat(repositoriesMetaData.repository("test-repo-1").type(), equalTo("fs"));
logger.info("--> creating anoter repository");
logger.info("--> creating another repository");
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-2")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("location", newTempDir(LifecycleScope.SUITE))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -120,13 +137,12 @@ public class RepositoriesTests extends AbstractSnapshotTests {
@Test
public void repositoryAckTimeoutTest() throws Exception {
logger.info("--> creating repository test-repo-1 with 0s timeout - shouldn't ack");
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo-1")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
)
.setTimeout("0s").get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(false));
@ -134,9 +150,9 @@ public class RepositoriesTests extends AbstractSnapshotTests {
logger.info("--> creating repository test-repo-2 with standard timeout - should ack");
putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo-2")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
@ -150,5 +166,74 @@ public class RepositoriesTests extends AbstractSnapshotTests {
assertThat(deleteRepositoryResponse.isAcknowledged(), equalTo(true));
}
@Test
public void repositoryVerificationTest() throws Exception {
Client client = client();
Settings settings = ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("random_control_io_exception_rate", 1.0).build();
logger.info("--> creating repository that cannot write any files - should fail");
assertThrows(client.admin().cluster().preparePutRepository("test-repo-1")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(settings),
RepositoryVerificationException.class);
logger.info("--> creating repository that cannot write any files, but suppress verification - should be acked");
assertAcked(client.admin().cluster().preparePutRepository("test-repo-1")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(settings).setVerify(false));
logger.info("--> verifying repository");
assertThrows(client.admin().cluster().prepareVerifyRepository("test-repo-1"), RepositoryVerificationException.class);
File location = newTempDir(LifecycleScope.SUITE);
logger.info("--> creating repository");
try {
client.admin().cluster().preparePutRepository("test-repo-1")
.setType(MockRepositoryModule.class.getCanonicalName())
.setSettings(ImmutableSettings.settingsBuilder()
.put("location", location)
.put("localize_location", true)
).get();
fail("RepositoryVerificationException wasn't generated");
} catch (RepositoryVerificationException ex) {
assertThat(ex.getMessage(), containsString("is not shared"));
}
}
@Test
public void repositoryVerificationTimeoutTest() throws Exception {
Client client = client();
Settings settings = ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("random_control_io_exception_rate", 1.0).build();
logger.info("--> creating repository that cannot write any files - should fail");
assertThrows(client.admin().cluster().preparePutRepository("test-repo-1")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(settings),
RepositoryVerificationException.class);
logger.info("--> creating repository that cannot write any files, but suppress verification - should be acked");
assertAcked(client.admin().cluster().preparePutRepository("test-repo-1")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(settings).setVerify(false));
logger.info("--> verifying repository");
assertThrows(client.admin().cluster().prepareVerifyRepository("test-repo-1"), RepositoryVerificationException.class);
File location = newTempDir(LifecycleScope.SUITE);
logger.info("--> creating repository");
try {
client.admin().cluster().preparePutRepository("test-repo-1")
.setType(MockRepositoryModule.class.getCanonicalName())
.setSettings(ImmutableSettings.settingsBuilder()
.put("location", location)
.put("localize_location", true)
).get();
fail("RepositoryVerificationException wasn't generated");
} catch (RepositoryVerificationException ex) {
assertThat(ex.getMessage(), containsString("is not shared"));
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
@ -406,7 +407,8 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.TEST))
.put("random", randomAsciiOfLength(10))
.put("random_control_io_exception_rate", 0.2)));
.put("random_control_io_exception_rate", 0.2))
.setVerify(false));
createIndex("test-idx");
ensureGreen();
@ -1308,8 +1310,8 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
// Update settings to make sure that relocation is slow so we can start snapshot before relocation is finished
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, 100)
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, 100)
));
logger.info("--> start relocations");
@ -1324,7 +1326,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
// Update settings to back to normal
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
));
logger.info("--> wait for snapshot to complete");

View File

@ -20,6 +20,7 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
@ -125,9 +126,11 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
logger.info("--> close indices");
client().admin().indices().prepareClose("index_before_*").get();
logger.info("--> verify repository");
client().admin().cluster().prepareVerifyRepository("test-repo").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));

View File

@ -21,26 +21,27 @@ package org.elasticsearch.snapshots.mockstore;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.fs.FsRepository;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.io.*;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
/**
*/
public class MockRepository extends FsRepository {
@ -68,8 +69,8 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
@Inject
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name, repositorySettings, indexShardRepository);
public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ClusterService clusterService) throws IOException {
super(name, overrideSettings(repositorySettings, clusterService), indexShardRepository);
randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false);
@ -80,6 +81,22 @@ public class MockRepository extends FsRepository {
mockBlobStore = new MockBlobStore(super.blobStore());
}
private static RepositorySettings overrideSettings(RepositorySettings repositorySettings, ClusterService clusterService) {
if (repositorySettings.settings().getAsBoolean("localize_location", false)) {
return new RepositorySettings(
repositorySettings.globalSettings(),
localizeLocation(repositorySettings.settings(), clusterService));
} else {
return repositorySettings;
}
}
private static Settings localizeLocation(Settings settings, ClusterService clusterService) {
File location = new File(settings.get("location"));
location = new File(location, clusterService.localNode().getId());
return settingsBuilder().put(settings).put("location", location.getAbsolutePath()).build();
}
private void addFailure() {
failureCounter.incrementAndGet();
}

View File

@ -210,6 +210,11 @@ public class CompositeTestCluster extends TestCluster {
return runningNodes().size() + cluster.numDataNodes();
}
@Override
public int numDataAndMasterNodes() {
return runningNodes().size() + cluster.numDataAndMasterNodes();
}
@Override
public int numBenchNodes() {
return cluster.numBenchNodes();

View File

@ -63,6 +63,7 @@ public final class ExternalTestCluster extends TestCluster {
private final String clusterName;
private final int numDataNodes;
private final int numMasterAndDataNodes;
private final int numBenchNodes;
public ExternalTestCluster(TransportAddress... transportAddresses) {
@ -79,11 +80,15 @@ public final class ExternalTestCluster extends TestCluster {
this.clusterName = nodeInfos.getClusterName().value();
int dataNodes = 0;
int benchNodes = 0;
int masterAndDataNodes = 0;
for (int i = 0; i < nodeInfos.getNodes().length; i++) {
NodeInfo nodeInfo = nodeInfos.getNodes()[i];
httpAddresses[i] = ((InetSocketTransportAddress) nodeInfo.getHttp().address().publishAddress()).address();
if (nodeInfo.getSettings().getAsBoolean("node.data", true)) {
if (DiscoveryNode.dataNode(nodeInfo.getSettings())) {
dataNodes++;
masterAndDataNodes++;
} else if (DiscoveryNode.masterNode(nodeInfo.getSettings())) {
masterAndDataNodes++;
}
if (nodeInfo.getSettings().getAsBoolean("node.bench", false)) {
benchNodes++;
@ -91,6 +96,7 @@ public final class ExternalTestCluster extends TestCluster {
}
this.numDataNodes = dataNodes;
this.numBenchNodes = benchNodes;
this.numMasterAndDataNodes = masterAndDataNodes;
logger.info("Setup ExternalTestCluster [{}] made of [{}] nodes", nodeInfos.getClusterName().value(), size());
}
@ -114,6 +120,11 @@ public final class ExternalTestCluster extends TestCluster {
return numDataNodes;
}
@Override
public int numDataAndMasterNodes() {
return numMasterAndDataNodes;
}
@Override
public int numBenchNodes() {
return numBenchNodes;

View File

@ -1399,6 +1399,11 @@ public final class InternalTestCluster extends TestCluster {
return dataNodeAndClients().size();
}
@Override
public int numDataAndMasterNodes() {
return dataAndMasterNodes().size();
}
@Override
public int numBenchNodes() {
return benchNodeAndClients().size();
@ -1455,10 +1460,22 @@ public final class InternalTestCluster extends TestCluster {
return Collections2.filter(nodes.values(), new DataNodePredicate());
}
private synchronized Collection<NodeAndClient> dataAndMasterNodes() {
return Collections2.filter(nodes.values(), new DataOrMasterNodePredicate());
}
private static final class DataNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return nodeAndClient.node.settings().getAsBoolean("node.data", true) && nodeAndClient.node.settings().getAsBoolean("node.client", false) == false;
return DiscoveryNode.dataNode(nodeAndClient.node.settings());
}
}
private static final class DataOrMasterNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return DiscoveryNode.dataNode(nodeAndClient.node.settings()) ||
DiscoveryNode.masterNode(nodeAndClient.node.settings());
}
}
@ -1478,7 +1495,7 @@ public final class InternalTestCluster extends TestCluster {
private static final class ClientNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return nodeAndClient.node.settings().getAsBoolean("node.client", false);
return DiscoveryNode.clientNode(nodeAndClient.node.settings());
}
}

View File

@ -97,6 +97,11 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
*/
public abstract int numDataNodes();
/**
* Returns the number of data and master eligible nodes in the cluster.
*/
public abstract int numDataAndMasterNodes();
/**
* Returns the number of bench nodes in the cluster.
*/

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.bench.AbortBenchmarkAction;
import org.elasticsearch.action.bench.BenchmarkAction;
@ -34,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.repositories.VerifyNodeRepositoryAction;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;
@ -138,5 +140,7 @@ public class ActionNamesBackwardsCompatibilityTest extends ElasticsearchBackward
actionsVersions.put(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME, Version.V_1_4_0_Beta1);
actionsVersions.put(SearchServiceTransportAction.FETCH_ID_SCROLL_ACTION_NAME, Version.V_1_4_0_Beta1);
actionsVersions.put(VerifyRepositoryAction.NAME, Version.V_1_4_0);
actionsVersions.put(VerifyNodeRepositoryAction.ACTION_NAME, Version.V_1_4_0);
}
}

View File

@ -22,12 +22,14 @@ package org.elasticsearch.transport;
import com.google.common.collect.Lists;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryAction;
import org.elasticsearch.action.bench.AbortBenchmarkAction;
import org.elasticsearch.action.bench.BenchmarkAction;
import org.elasticsearch.action.bench.BenchmarkService;
import org.elasticsearch.action.bench.BenchmarkStatusAction;
import org.elasticsearch.action.exists.ExistsAction;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.repositories.VerifyNodeRepositoryAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -142,5 +144,7 @@ public class ActionNamesTests extends ElasticsearchIntegrationTest {
post_1_4_actions.add(GetIndexAction.NAME);
post_1_4_actions.add(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME);
post_1_4_actions.add(SearchServiceTransportAction.FETCH_ID_SCROLL_ACTION_NAME);
post_1_4_actions.add(VerifyRepositoryAction.NAME);
post_1_4_actions.add(VerifyNodeRepositoryAction.ACTION_NAME);
}
}