api for synced flush

closes #11098
This commit is contained in:
Britta Weber 2015-05-05 18:12:40 +02:00
parent e6f5fb82f0
commit c628d67f9e
16 changed files with 837 additions and 23 deletions

View File

@ -0,0 +1,17 @@
{
"indices.seal": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-seal.html",
"methods": ["POST", "GET"],
"url": {
"path": "/_seal",
"paths": ["/_seal", "/{index}/_seal"],
"parts": {
"index": {
"type" : "list",
"description" : "A comma-separated list of index names; use `_all` or empty string for all indices"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,16 @@
---
"Index seal rest test":
- do:
indices.create:
index: testing
- do:
cluster.health:
wait_for_status: green
- do:
indices.seal:
index: testing
- do:
indices.stats: {level: shards}
- is_true: indices.testing.shards.0.0.commit.user_data.sync_id

View File

@ -103,6 +103,8 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettin
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.action.admin.indices.seal.SealIndicesAction;
import org.elasticsearch.action.admin.indices.seal.TransportSealIndicesAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
@ -254,6 +256,7 @@ public class ActionModule extends AbstractModule {
registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
registerAction(SealIndicesAction.INSTANCE, TransportSealIndicesAction.class);
registerAction(OptimizeAction.INSTANCE, TransportOptimizeAction.class);
registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
*/
public class SealIndicesAction extends Action<SealIndicesRequest, SealIndicesResponse, SealIndicesRequestBuilder> {
public static final SealIndicesAction INSTANCE = new SealIndicesAction();
public static final String NAME = "indices:admin/sealindices";
private SealIndicesAction() {
super(NAME);
}
@Override
public SealIndicesResponse newResponse() {
return new SealIndicesResponse();
}
@Override
public SealIndicesRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new SealIndicesRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Arrays;
/**
* A request to seal one or more indices.
*/
public class SealIndicesRequest extends ActionRequest implements IndicesRequest.Replaceable {
private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
SealIndicesRequest() {
}
/**
* Constructs a seal request against one or more indices. If nothing is provided, all indices will
* be sealed.
*/
public SealIndicesRequest(String... indices) {
this.indices = indices;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(indices);
indicesOptions.writeIndicesOptions(out);
}
@Override
public String toString() {
return "SealIndicesRequest{" +
"indices=" + Arrays.toString(indices) +
", indicesOptions=" + indicesOptions +
'}';
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
@Override
public SealIndicesRequest indices(String[] indices) {
this.indices = indices;
return this;
}
public String[] indices() {
return indices;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
@SuppressWarnings("unchecked")
public final SealIndicesRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
*
*/
public class SealIndicesRequestBuilder extends ActionRequestBuilder<SealIndicesRequest, SealIndicesResponse, SealIndicesRequestBuilder> {
public SealIndicesRequestBuilder(ElasticsearchClient client, SealIndicesAction action) {
super(client, action, new SealIndicesRequest());
}
public SealIndicesRequestBuilder indices(String ... indices) {
request.indices(indices);
return this;
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.SyncedFlushService;
import java.io.IOException;
import java.util.*;
/**
* A response to a seal action on several indices.
*/
public class SealIndicesResponse extends ActionResponse implements ToXContent {
private Set<SyncedFlushService.SyncedFlushResult> results;
SealIndicesResponse() {
}
SealIndicesResponse(Set<SyncedFlushService.SyncedFlushResult> results) {
this.results = results;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
results = new HashSet<>();
for (int i = 0; i < size; i++) {
SyncedFlushService.SyncedFlushResult syncedFlushResult = new SyncedFlushService.SyncedFlushResult();
syncedFlushResult.readFrom(in);
results.add(syncedFlushResult);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(results.size());
for (SyncedFlushService.SyncedFlushResult syncedFlushResult : results) {
syncedFlushResult.writeTo(out);
}
}
public Set<SyncedFlushService.SyncedFlushResult> results() {
return results;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Map<String, Map<Integer, Object>> allResults = new HashMap<>();
// first, sort everything by index and shard id
for (SyncedFlushService.SyncedFlushResult result : results) {
String indexName = result.getShardId().index().name();
int shardId = result.getShardId().getId();
if (allResults.get(indexName) == null) {
// no results yet for this index
allResults.put(indexName, new TreeMap<Integer, Object>());
}
if (result.shardResponses().size() > 0) {
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses = new HashMap<>();
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponse : result.shardResponses().entrySet()) {
shardResponses.put(shardResponse.getKey(), shardResponse.getValue());
}
allResults.get(indexName).put(shardId, shardResponses);
} else {
allResults.get(indexName).put(shardId, result.failureReason());
}
}
for (Map.Entry<String, Map<Integer, Object>> result : allResults.entrySet()) {
builder.startArray(result.getKey());
for (Map.Entry<Integer, Object> shardResponse : result.getValue().entrySet()) {
builder.startObject();
builder.field("shard_id", shardResponse.getKey());
if (shardResponse.getValue() instanceof Map) {
builder.startObject("responses");
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> results = (Map<ShardRouting, SyncedFlushService.SyncedFlushResponse>) shardResponse.getValue();
boolean success = true;
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardCopy : results.entrySet()) {
builder.field(shardCopy.getKey().currentNodeId(), shardCopy.getValue().success() ? "success" : shardCopy.getValue().failureReason());
if (shardCopy.getValue().success() == false) {
success = false;
}
}
builder.endObject();
builder.field("message", success ? "success" : "failed on some copies");
} else {
builder.field("message", shardResponse.getValue()); // must be a string
}
builder.endObject();
}
builder.endArray();
}
return builder;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Set;
/**
*/
public class TransportSealIndicesAction extends HandledTransportAction<SealIndicesRequest, SealIndicesResponse> {
final private SyncedFlushService syncedFlushService;
final private ClusterService clusterService;
@Inject
public TransportSealIndicesAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, SyncedFlushService syncedFlushService, ClusterService clusterService) {
super(settings, SealIndicesAction.NAME, threadPool, transportService, actionFilters, SealIndicesRequest.class);
this.syncedFlushService = syncedFlushService;
this.clusterService = clusterService;
}
@Override
protected void doExecute(final SealIndicesRequest request, final ActionListener<SealIndicesResponse> listener) {
ClusterState state = clusterService.state();
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
GroupShardsIterator primaries = state.routingTable().activePrimaryShardsGrouped(concreteIndices, false);
final CountDown countDown = new CountDown(primaries.size());
final Set<SyncedFlushService.SyncedFlushResult> results = ConcurrentCollections.newConcurrentSet();
for (final ShardIterator shard : primaries) {
final ShardId shardId = shard.shardId();
syncedFlushService.attemptSyncedFlush(shardId, new ActionListener<SyncedFlushService.SyncedFlushResult>() {
@Override
public void onResponse(SyncedFlushService.SyncedFlushResult syncedFlushResult) {
results.add(syncedFlushResult);
if (countDown.countDown()) {
listener.onResponse(new SealIndicesResponse(results));
}
}
@Override
public void onFailure(Throwable e) {
logger.debug("{} unexpected error while executing synced flush", shardId);
results.add(new SyncedFlushService.SyncedFlushResult(shardId, e.getMessage()));
if (countDown.countDown()) {
listener.onResponse(new SealIndicesResponse(results));
}
}
});
}
}
}

View File

@ -84,6 +84,9 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRespons
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequestBuilder;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
@ -359,6 +362,27 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
FlushRequestBuilder prepareFlush(String... indices);
/**
* Explicitly sync flush one or more indices
*
* @param request The seal indices request
* @return A result future
*/
ActionFuture<SealIndicesResponse> sealIndices(SealIndicesRequest request);
/**
* Explicitly sync flush one or more indices
*
* @param request The seal indices request
* @param listener A listener to be notified with a result
*/
void sealIndices(SealIndicesRequest request, ActionListener<SealIndicesResponse> listener);
/**
* Explicitly seal one or more indices
*/
SealIndicesRequestBuilder prepareSealIndices(String... indices);
/**
* Explicitly optimize one or more indices into a the number of segments.
*
@ -723,4 +747,5 @@ public interface IndicesAdminClient extends ElasticsearchClient {
* @see #getSettings(org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest)
*/
GetSettingsRequestBuilder prepareGetSettings(String... indices);
}

View File

@ -180,6 +180,10 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.seal.SealIndicesAction;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequestBuilder;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
@ -1323,6 +1327,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new FlushRequestBuilder(this, FlushAction.INSTANCE).setIndices(indices);
}
@Override
public ActionFuture<SealIndicesResponse> sealIndices(SealIndicesRequest request) {
return execute(SealIndicesAction.INSTANCE, request);
}
@Override
public void sealIndices(SealIndicesRequest request, ActionListener<SealIndicesResponse> listener) {
execute(SealIndicesAction.INSTANCE, request, listener);
}
@Override
public SealIndicesRequestBuilder prepareSealIndices(String... indices) {
return new SealIndicesRequestBuilder(this, SealIndicesAction.INSTANCE).indices(indices);
}
@Override
public void getMappings(GetMappingsRequest request, ActionListener<GetMappingsResponse> listener) {
execute(GetMappingsAction.INSTANCE, request, listener);

View File

@ -18,17 +18,13 @@
*/
package org.elasticsearch.indices;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.DelegatingActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -39,7 +35,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.IndexService;
@ -56,10 +51,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class SyncedFlushService extends AbstractComponent {
@ -135,7 +128,6 @@ public class SyncedFlushService extends AbstractComponent {
if (inflight != 1) {
actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]"));
}
String syncId = Strings.base64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener);
} catch (Throwable t) {
@ -155,6 +147,7 @@ public class SyncedFlushService extends AbstractComponent {
}
final AtomicInteger result = new AtomicInteger(-1);
final CountDownLatch latch = new CountDownLatch(1);
logger.trace("{} retrieving in flight operation count", shardId);
transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
new BaseTransportResponseHandler<InFlightOpsResponse>() {
@Override
@ -340,21 +333,25 @@ public class SyncedFlushService extends AbstractComponent {
return new InFlightOpsResponse(opCount);
}
public static class SyncedFlushResult {
private final String failureReason;
private final Map<ShardRouting, SyncedFlushResponse> shardResponses;
private final String syncId;
public static class SyncedFlushResult extends TransportResponse {
private String failureReason;
private Map<ShardRouting, SyncedFlushResponse> shardResponses;
private String syncId;
private ShardId shardId;
public SyncedFlushResult() {
}
public ShardId getShardId() {
return shardId;
}
private final ShardId shardId;
/**
* failure constructor
*/
SyncedFlushResult(ShardId shardId, String failureReason) {
public SyncedFlushResult(ShardId shardId, String failureReason) {
this.syncId = null;
this.failureReason = failureReason;
this.shardResponses = new HashMap<>();
@ -364,7 +361,8 @@ public class SyncedFlushService extends AbstractComponent {
/**
* success constructor
*/
SyncedFlushResult(ShardId shardId, String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
public SyncedFlushResult(ShardId shardId, String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
this.failureReason = null;
this.shardResponses = shardResponses;
this.syncId = syncId;
@ -404,6 +402,38 @@ public class SyncedFlushService extends AbstractComponent {
return shardResponses;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(failureReason);
out.writeOptionalString(syncId);
out.writeVInt(shardResponses.size());
for (Map.Entry<ShardRouting, SyncedFlushResponse> result : shardResponses.entrySet()) {
result.getKey().writeTo(out);
result.getValue().writeTo(out);
}
shardId.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureReason = in.readOptionalString();
syncId = in.readOptionalString();
int size = in.readVInt();
shardResponses = new HashMap<>();
for (int i = 0; i < size; i++) {
ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
SyncedFlushResponse syncedFlushRsponse = new SyncedFlushResponse();
syncedFlushRsponse.readFrom(in);
shardResponses.put(shardRouting, syncedFlushRsponse);
}
shardId = ShardId.readShardId(in);
}
public ShardId shardId() {
return shardId;
}
}
final static class PreSyncedFlushRequest extends TransportRequest {
@ -521,7 +551,7 @@ public class SyncedFlushService extends AbstractComponent {
}
}
static final class SyncedFlushResponse extends TransportResponse {
public static final class SyncedFlushResponse extends TransportResponse {
/**
* a non null value indicates a failure to sync flush. null means success

View File

@ -23,6 +23,9 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.admin.indices.seal.RestSealIndicesAction;
import org.elasticsearch.rest.action.admin.indices.upgrade.RestUpgradeAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
@ -30,7 +33,6 @@ import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsActi
import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.put.RestPutRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.reroute.RestClusterRerouteAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction;
@ -73,7 +75,6 @@ import org.elasticsearch.rest.action.admin.indices.template.delete.RestDeleteInd
import org.elasticsearch.rest.action.admin.indices.template.get.RestGetIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.template.head.RestHeadIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.template.put.RestPutIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.upgrade.RestUpgradeAction;
import org.elasticsearch.rest.action.admin.indices.validate.query.RestValidateQueryAction;
import org.elasticsearch.rest.action.admin.indices.warmer.delete.RestDeleteWarmerAction;
import org.elasticsearch.rest.action.admin.indices.warmer.get.RestGetWarmerAction;
@ -182,6 +183,7 @@ public class RestActionModule extends AbstractModule {
bind(RestRefreshAction.class).asEagerSingleton();
bind(RestFlushAction.class).asEagerSingleton();
bind(RestSealIndicesAction.class).asEagerSingleton();
bind(RestOptimizeAction.class).asEagerSingleton();
bind(RestUpgradeAction.class).asEagerSingleton();
bind(RestClearIndicesCacheAction.class).asEagerSingleton();

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.seal;
import org.elasticsearch.action.admin.indices.seal.SealIndicesAction;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*
*/
public class RestSealIndicesAction extends BaseRestHandler {
@Inject
public RestSealIndicesAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(POST, "/_seal", this);
controller.registerHandler(POST, "/{index}/_seal", this);
controller.registerHandler(GET, "/_seal", this);
controller.registerHandler(GET, "/{index}/_seal", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
SealIndicesRequest sealIndicesRequest = new SealIndicesRequest(indices);
client.admin().indices().execute(SealIndicesAction.INSTANCE, sealIndicesRequest, new RestBuilderListener<SealIndicesResponse>(channel) {
@Override
public RestResponse buildResponse(SealIndicesResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.hamcrest.Matchers.equalTo;
public class SealIndicesTests extends ElasticsearchTestCase {
public void testSealIndicesResponseStreaming() throws IOException {
Set<SyncedFlushService.SyncedFlushResult> shardResults = new HashSet<>();
// add one result where one shard failed and one succeeded
SyncedFlushService.SyncedFlushResult syncedFlushResult = createSyncedFlushResult(0, "test");
shardResults.add(syncedFlushResult);
// add one result where all failed
syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", 1), "all failed :(");
shardResults.add(syncedFlushResult);
SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults);
BytesStreamOutput out = new BytesStreamOutput();
sealIndicesResponse.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
SealIndicesResponse readResponse = new SealIndicesResponse();
readResponse.readFrom(in);
Map<String, Object> asMap = convertToMap(readResponse);
assertResponse(asMap);
}
public void testXContentResponse() throws IOException {
Set<SyncedFlushService.SyncedFlushResult> shardResults = new HashSet<>();
// add one result where one shard failed and one succeeded
SyncedFlushService.SyncedFlushResult syncedFlushResult = createSyncedFlushResult(0, "test");
shardResults.add(syncedFlushResult);
// add one result where all failed
syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", 1), "all failed :(");
shardResults.add(syncedFlushResult);
SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults);
Map<String, Object> asMap = convertToMap(sealIndicesResponse);
assertResponse(asMap);
}
protected void assertResponse(Map<String, Object> asMap) {
assertNotNull(asMap.get("test"));
assertThat((Integer) (((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("shard_id")), equalTo(0));
assertThat((String) (((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("message")), equalTo("failed on some copies"));
HashMap<String, String> shardResponses = (HashMap<String, String>) ((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("responses");
assertThat(shardResponses.get("node_1"), equalTo("failed for some reason"));
assertThat(shardResponses.get("node_2"), equalTo("success"));
HashMap<String, Object> failedShard = (HashMap<String, Object>) (((ArrayList) asMap.get("test")).get(1));
assertThat((Integer) (failedShard.get("shard_id")), equalTo(1));
assertThat((String) (failedShard.get("message")), equalTo("all failed :("));
}
public void testXContentResponseSortsShards() throws IOException {
Set<SyncedFlushService.SyncedFlushResult> shardResults = new HashSet<>();
// add one result where one shard failed and one succeeded
SyncedFlushService.SyncedFlushResult syncedFlushResult;
for (int i = 100000; i >= 0; i--) {
if (randomBoolean()) {
syncedFlushResult = createSyncedFlushResult(i, "test");
shardResults.add(syncedFlushResult);
} else {
syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", i), "all failed :(");
shardResults.add(syncedFlushResult);
}
}
SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults);
Map<String, Object> asMap = convertToMap(sealIndicesResponse);
assertNotNull(asMap.get("test"));
for (int i = 0; i < 100000; i++) {
assertThat((Integer) (((HashMap) ((ArrayList) asMap.get("test")).get(i)).get("shard_id")), equalTo(i));
}
}
protected SyncedFlushService.SyncedFlushResult createSyncedFlushResult(int shardId, String index) {
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> responses = new HashMap<>();
ImmutableShardRouting shardRouting = new ImmutableShardRouting(index, shardId, "node_1", false, ShardRoutingState.RELOCATING, 2);
SyncedFlushService.SyncedFlushResponse syncedFlushResponse = new SyncedFlushService.SyncedFlushResponse("failed for some reason");
responses.put(shardRouting, syncedFlushResponse);
shardRouting = new ImmutableShardRouting(index, shardId, "node_2", false, ShardRoutingState.RELOCATING, 2);
syncedFlushResponse = new SyncedFlushService.SyncedFlushResponse();
responses.put(shardRouting, syncedFlushResponse);
return new SyncedFlushService.SyncedFlushResult(new ShardId(index, shardId), "some_sync_id", responses);
}
}

View File

@ -22,7 +22,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -32,10 +37,14 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Thread.sleep;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@ -98,8 +107,119 @@ public class FlushTest extends ElasticsearchIntegrationTest {
String syncId = result.syncId();
for (ShardStats shardStats : indexStats.getShards()) {
final String shardSyncId = shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID);
assertThat(shardSyncId, equalTo(syncId));
assertThat(shardSyncId, equalTo(syncId));
}
// now, start new node and relocate a shard there and see if sync id still there
String newNodeName = internalCluster().startNode();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
ShardRouting shardRouting = clusterState.getRoutingTable().index("test").shard(0).iterator().next();
String currentNodeName = clusterState.nodes().resolveNode(shardRouting.currentNodeId()).name();
assertFalse(currentNodeName.equals(newNodeName));
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), currentNodeName, newNodeName)).get();
client().admin().cluster().prepareHealth()
.setWaitForRelocatingShards(0)
.get();
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build()).get();
ensureGreen("test");
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1).build()).get();
ensureGreen("test");
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
@TestLogging("indices:TRACE")
public void testSyncedFlushWithApi() throws ExecutionException, InterruptedException, IOException {
createIndex("test");
ensureGreen();
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
logger.info("--> trying sync flush");
SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get();
logger.info("--> sync flush done");
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
@TestLogging("indices:TRACE")
public void testSyncedFlushWithApiAndConcurrentIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex("test");
client().admin().indices().prepareUpdateSettings("test").setSettings(
ImmutableSettings.builder().put("index.translog.disable_flush", true).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
.get();
ensureGreen();
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicInteger numDocs = new AtomicInteger(0);
Thread indexingThread = new Thread() {
@Override
public void run() {
while (stop.get() == false) {
client().prepareIndex().setIndex("test").setType("doc").setSource("{}").get();
numDocs.incrementAndGet();
}
}
};
indexingThread.start();
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
logger.info("--> trying sync flush");
SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get();
logger.info("--> sync flush done");
stop.set(true);
indexingThread.join();
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertFlushResponseEqualsShardStats(shardStats, sealIndicesResponse);
}
refresh();
assertThat(client().prepareCount().get().getCount(), equalTo((long) numDocs.get()));
logger.info("indexed {} docs", client().prepareCount().get().getCount());
logClusterState();
internalCluster().fullRestart();
ensureGreen();
assertThat(client().prepareCount().get().getCount(), equalTo((long) numDocs.get()));
}
private void assertFlushResponseEqualsShardStats(ShardStats shardStats, SealIndicesResponse sealIndicesResponse) {
for (SyncedFlushService.SyncedFlushResult shardResult : sealIndicesResponse.results()) {
if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) {
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> singleResponse : shardResult.shardResponses().entrySet()) {
if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) {
if (singleResponse.getValue().success()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
logger.info("sync flushed {} on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
} else {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
logger.info("sync flush failed for {} on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
}
}
}
}
}
}
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -1497,8 +1498,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<RefreshResponse>(newLatch(inFlightAsyncOperations)));
} else if (maybeFlush && rarely()) {
client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations)));
if (randomBoolean()) {
client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations)));
} else {
client().admin().indices().prepareSealIndices(indices).execute(
new LatchedActionListener<SealIndicesResponse>(newLatch(inFlightAsyncOperations)));
}
} else if (rarely()) {
client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute(
new LatchedActionListener<OptimizeResponse>(newLatch(inFlightAsyncOperations)));