diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index b6666813ce7..942d512eac8 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.IntSet; import com.google.common.collect.*; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -255,35 +254,6 @@ public class RoutingTable implements Iterable { return new GroupShardsIterator(set); } - /** - * All the shard copies for the provided shard id grouped. Each group is a single element, consisting - * either of the primary shard of one replica. - * - * @param shardId the shard id for the copies we want - * @return All the shard copies (primary and replicas) for the shardId - * @throws IndexMissingException If an index passed does not exists - * @see IndexRoutingTable#groupByAllIt() - */ - public GroupShardsIterator allActiveShardCopiesGrouped(ShardId shardId) throws IndexMissingException { - // use list here since we need to maintain identity across shards - ArrayList set = new ArrayList<>(); - IndexRoutingTable indexRoutingTable = index(shardId.index().name()); - if (indexRoutingTable == null) { - throw new IndexMissingException(new Index(shardId.index().name())); - } - IndexShardRoutingTable copiesRoutingTable = indexRoutingTable.shard(shardId.id()); - if (copiesRoutingTable != null) { - for (ShardRouting shardRouting : copiesRoutingTable) { - if (shardRouting.active()) { - set.add(shardRouting.shardsIt()); - } - } - } else { - throw new ElasticsearchIllegalStateException(shardId + " does not exist"); - } - return new GroupShardsIterator(set); - } - public static Builder builder() { return new Builder(); } diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index c7bfdb11983..cd72d789c70 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -215,7 +215,13 @@ public abstract class Engine implements Closeable { * @param expectedCommitId the expected value of * @return true if the sync commit was made, false o.w. */ - public abstract boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException; + public abstract SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException; + + public enum SyncedFlushResult { + SUCCESS, + FAILED_COMMIT_MISMATCH, + FAILED_PENDING_OPERATIONS + } final protected GetResult getFromSearcher(Get get) throws EngineException { final Searcher searcher = acquireSearcher("get"); diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7b05a7449d1..ac2fca4e3da 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -598,26 +598,26 @@ public class InternalEngine extends Engine { } @Override - public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException { + public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException { // best effort attempt before we acquire locks ensureOpen(); if (indexWriter.hasUncommittedChanges()) { logger.trace("can't sync commit [{}]. have pending changes", syncId); - return false; + return SyncedFlushResult.FAILED_PENDING_OPERATIONS; } if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) { logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId); - return false; + return SyncedFlushResult.FAILED_COMMIT_MISMATCH; } try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); if (indexWriter.hasUncommittedChanges()) { logger.trace("can't sync commit [{}]. have pending changes", syncId); - return false; + return SyncedFlushResult.FAILED_PENDING_OPERATIONS; } if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) { logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId); - return false; + return SyncedFlushResult.FAILED_COMMIT_MISMATCH; } logger.trace("starting sync commit [{}]", syncId); long translogId = translog.currentId(); @@ -628,7 +628,7 @@ public class InternalEngine extends Engine { commitIndexWriter(indexWriter); logger.debug("successfully sync committed. sync id [{}].", syncId); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - return true; + return SyncedFlushResult.SUCCESS; } catch (IOException ex) { maybeFailEngine("sync commit", ex); throw new EngineException(shardId, "failed to sync commit", ex); diff --git a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index e3a1f60b75f..69887ea042e 100644 --- a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -108,7 +108,7 @@ public class ShadowEngine extends Engine { } @Override - public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) { + public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) { throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine"); } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dd1fb3cf619..c04ec70747d 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -116,7 +116,6 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.util.Map; -import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -681,7 +680,7 @@ public class IndexShard extends AbstractIndexShardComponent { return completionStats; } - public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) { + public Engine.SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) { verifyStartedOrRecovering(); logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId); return engine().syncFlushIfNoPendingChanges(syncId, expectedCommitId); diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index a6080d5a4a1..785d2af7eca 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; -import org.elasticsearch.indices.syncedflush.SyncedFlushService; import org.elasticsearch.indices.ttl.IndicesTTLService; /** diff --git a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java new file mode 100644 index 00000000000..a0648c31b9a --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java @@ -0,0 +1,649 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexShardMissingException; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncedFlushService extends AbstractComponent { + + // nocommmit: check these are ok + public static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre"; + public static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync"; + public static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight"; + + public static final String SETTING_PRE_SYNC_TIMEOUT = "indices.flush.synced.presync_timeout"; + public static final String SETTING_SYNC_TIMEOUT = "indices.flush.synced.sync_timeout"; + public static final String SETTING_IN_FLIGHT_OPS_TIMEOUT = "indices.flush.synced.in_flight_ops_timeout"; + + private final IndicesService indicesService; + private final ClusterService clusterService; + private final TransportService transportService; + private final TimeValue preSyncTimeout; + private final TimeValue syncTimeout; + private final TimeValue inflightOpsTimeout; + + @Inject + public SyncedFlushService(Settings settings, IndicesService indicesService, ClusterService clusterService, TransportService transportService) { + super(settings); + this.indicesService = indicesService; + this.clusterService = clusterService; + this.transportService = transportService; + + transportService.registerHandler(PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushTransportHandler()); + transportService.registerHandler(SYNCED_FLUSH_ACTION_NAME, new SyncedFlushTransportHandler()); + transportService.registerHandler(IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpCountTransportHandler()); + preSyncTimeout = settings.getAsTime(SETTING_PRE_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5)); + syncTimeout = settings.getAsTime(SETTING_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5)); + inflightOpsTimeout = settings.getAsTime(SETTING_IN_FLIGHT_OPS_TIMEOUT, TimeValue.timeValueMinutes(5)); + } + + public SyncedFlushResult attemptSyncedFlush(ShardId shardId) throws ExecutionException, InterruptedException { + final ClusterState state = clusterService.state(); + final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name()); + if (indexRoutingTable == null) { + throw new IndexMissingException(shardId.index()); + } + final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id()); + if (shardRoutingTable == null) { + throw new IndexShardMissingException(shardId); + } + final List activeShards = shardRoutingTable.activeShards(); + Map commitIds = sendPreSyncRequests(activeShards, state, shardId); + + if (commitIds.isEmpty()) { + return new SyncedFlushResult("all shards failed to commit on pre-sync"); + } + + int inflight = getInflightOpsCount(shardId, state, shardRoutingTable); + if (inflight < 0 || inflight > 0) { + return new SyncedFlushResult("operation counter on primary is non zero [" + inflight + "]"); + } + + String syncId = Strings.base64UUID(); + Map results = sendSyncRequests(syncId, activeShards, state, commitIds, shardId); + + return new SyncedFlushResult(syncId, results); + } + + /** returns the number of inflight operations on primary. -1 upon error. */ + protected int getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable) { + final ShardRouting primaryShard = shardRoutingTable.primaryShard(); + final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId()); + if (primaryNode == null) { + logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard); + return -1; + } + final AtomicInteger result = new AtomicInteger(-1); + final CountDownLatch latch = new CountDownLatch(1); + transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId), + new BaseTransportResponseHandler() { + @Override + public InFlightOpsResponse newInstance() { + return new InFlightOpsResponse(); + } + + @Override + public void handleResponse(InFlightOpsResponse response) { + result.set(response.opCount()); + latch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.debug("{} unexpected error while retrieving inflight op count", shardId); + result.set(-1); + latch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + try { + if (latch.await(inflightOpsTimeout.millis(), TimeUnit.MILLISECONDS) == false) { + logger.debug("{} in flight operation check timed out after [{}]", shardId, syncTimeout); + } + } catch (InterruptedException e) { + logger.debug("{} interrupted while waiting for in flight operation check", shardId); + } + + final int count = result.get(); + logger.trace("{} in flight operation count [{}]", shardId, count); + return count; + } + + + private Map sendSyncRequests(final String syncId, List shards, ClusterState state, Map expectedCommitIds, final ShardId shardId) { + final CountDownLatch countDownLatch = new CountDownLatch(shards.size()); + final Map results = ConcurrentCollections.newConcurrentMap(); + for (final ShardRouting shard : shards) { + final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); + if (node == null) { + logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); + results.put(shard, new SyncedFlushResponse("unknown node")); + countDownLatch.countDown(); + continue; + } + final byte[] expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); + if (expectedCommitId == null) { + logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); + results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush")); + countDownLatch.countDown(); + continue; + } + logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId); + transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new SyncedFlushRequest(shard.shardId(), syncId, expectedCommitId), + new BaseTransportResponseHandler() { + @Override + public SyncedFlushResponse newInstance() { + return new SyncedFlushResponse(); + } + + @Override + public void handleResponse(SyncedFlushResponse response) { + SyncedFlushResponse existing = results.put(shard, response); + assert existing == null : "got two answers for node [" + node + "]"; + // count after the assert so we won't decrement twice in handleException + countDownLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard); + results.put(shard, new SyncedFlushResponse(exp.getMessage())); + countDownLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + try { + if (countDownLatch.await(syncTimeout.millis(), TimeUnit.MILLISECONDS) == false) { + logger.debug("{} waiting for synced flush with id [{}] timed out after [{}]. pending ops [{}]", shardId, syncId, syncTimeout, countDownLatch.getCount()); + } + } catch (InterruptedException e) { + logger.debug("{} interrupted while waiting for sync requests (sync id [{}])", shardId, syncId); + } + + return results; + } + + /** send presync requests to all started copies of the given shard */ + Map sendPreSyncRequests(final List shards, final ClusterState state, final ShardId shardId) { + final CountDownLatch countDownLatch = new CountDownLatch(shards.size()); + final Map commitIds = ConcurrentCollections.newConcurrentMap(); + for (final ShardRouting shard : shards) { + logger.trace("{} sending pre-synced flush request to {}", shardId, shard); + final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); + if (node == null) { + logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard); + countDownLatch.countDown(); + continue; + } + transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler() { + @Override + public PreSyncedFlushResponse newInstance() { + return new PreSyncedFlushResponse(); + } + + @Override + public void handleResponse(PreSyncedFlushResponse response) { + byte[] existing = commitIds.put(node.id(), response.commitId()); + assert existing == null : "got two answers for node [" + node + "]"; + // count after the assert so we won't decrement twice in handleException + countDownLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard); + countDownLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + try { + if (countDownLatch.await(preSyncTimeout.millis(), TimeUnit.MILLISECONDS) == false) { + logger.debug("{} waiting for pre sync flush requests timed out after [{}]. pending ops [{}]", shardId, preSyncTimeout, countDownLatch.getCount()); + } + } catch (InterruptedException e) { + logger.debug("{} interrupted while waiting for presync requests", shardId); + } + + return commitIds; + } + + private PreSyncedFlushResponse performPreSyncedFlush(PreSyncedFlushRequest request) { + IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); + FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); + logger.trace("{} performing pre sync flush", request.shardId()); + byte[] id = indexShard.flush(flushRequest); + logger.trace("{} pre sync flush done. commit id {}", request.shardId(), id); + return new PreSyncedFlushResponse(id); + } + + private SyncedFlushResponse performSyncedFlush(SyncedFlushRequest request) { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId()); + Engine.SyncedFlushResult result = indexShard.syncFlushIfNoPendingChanges(request.syncId(), request.expectedCommitId()); + logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result); + switch (result) { + case SUCCESS: + return new SyncedFlushResponse(); + case FAILED_COMMIT_MISMATCH: + return new SyncedFlushResponse("commit has changed"); + case FAILED_PENDING_OPERATIONS: + return new SyncedFlushResponse("pending operations"); + default: + throw new ElasticsearchException("unknown synced flush result [" + result + "]"); + } + } + + private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + if (indexShard.routingEntry().primary() == false) { + throw new IndexShardException(request.shardId(), "expected a primary shard"); + } + // nocommit - implement :) + int opCount = 0; + logger.trace("{} in flight operations sampled at [{}]", request.shardId(), opCount); + return new InFlightOpsResponse(opCount); + } + + public static class SyncedFlushResult { + private final String failureReason; + private final Map shardResponses; + private final String syncId; + + /** failure constructor */ + SyncedFlushResult(String failureReason) { + this.syncId = null; + this.failureReason = failureReason; + this.shardResponses = new HashMap<>(); + } + + /** success constructor */ + SyncedFlushResult(String syncId, Map shardResponses) { + this.failureReason = null; + this.shardResponses = shardResponses; + this.syncId = syncId; + } + + public boolean success() { + return syncId != null; + } + + public String failureReason() { + return failureReason; + } + + public String syncId() { + return syncId; + } + + /** total number of shards for which a sync attempt was made */ + public int totalShards() { + return shardResponses.size(); + } + + public int successfulShards() { + int i = 0; + for (SyncedFlushResponse result : shardResponses.values()) { + if (result.success()) { + i++; + } + } + return i; + } + + public Map shardResponses() { + return shardResponses; + } + + } + + final static class PreSyncedFlushRequest extends TransportRequest { + private ShardId shardId; + + + PreSyncedFlushRequest() { + } + + public PreSyncedFlushRequest(ShardId shardId) { + this.shardId = shardId; + } + + @Override + public String toString() { + return "PreSyncedFlushRequest{" + + "shardId=" + shardId + + '}'; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + this.shardId = ShardId.readShardId(in); + } + + public ShardId shardId() { + return shardId; + } + } + + final static class PreSyncedFlushResponse extends TransportResponse { + + private byte[] commitId; + + PreSyncedFlushResponse() { + } + + PreSyncedFlushResponse(byte[] commitId) { + this.commitId = commitId; + } + + public byte[] commitId() { + return commitId; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + commitId = in.readByteArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeByteArray(commitId); + } + } + + static final class SyncedFlushRequest extends TransportRequest { + + private String syncId; + private byte[] expectedCommitId; + private ShardId shardId; + + public SyncedFlushRequest() { + } + + public SyncedFlushRequest(ShardId shardId, String syncId, byte[] expectedCommitId) { + this.expectedCommitId = expectedCommitId; + this.shardId = shardId; + this.syncId = syncId; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + expectedCommitId = in.readByteArray(); + syncId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeByteArray(expectedCommitId); + out.writeString(syncId); + } + + public ShardId shardId() { + return shardId; + } + + public String syncId() { + return syncId; + } + + public byte[] expectedCommitId() { + return expectedCommitId; + } + + @Override + public String toString() { + return "SyncedFlushRequest{" + + "shardId=" + shardId + + ",syncId='" + syncId + '\'' + + '}'; + } + } + + static final class SyncedFlushResponse extends TransportResponse { + + /** a non null value indicates a failure to sync flush. null means success */ + String failureReason; + + + public SyncedFlushResponse() { + failureReason = null; + } + + public SyncedFlushResponse(String failureReason) { + this.failureReason = failureReason; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + failureReason = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(failureReason); + } + + public boolean success() { + return failureReason == null; + } + + public String failureReason() { + return failureReason; + } + + @Override + public String toString() { + return "SyncedFlushResponse{" + + "success=" + success() + + ", failureReason='" + failureReason + '\'' + + '}'; + } + } + + + static final class InFlightOpsRequest extends TransportRequest { + + private ShardId shardId; + + public InFlightOpsRequest() { + } + + public InFlightOpsRequest(ShardId shardId) { + this.shardId = shardId; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + } + + public ShardId shardId() { + return shardId; + } + + @Override + public String toString() { + return "InFlightOpsRequest{" + + "shardId=" + shardId + + '}'; + } + } + + static final class InFlightOpsResponse extends TransportResponse { + + int opCount; + + + public InFlightOpsResponse() { + } + + public InFlightOpsResponse(int opCount) { + this.opCount = opCount; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + opCount = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(opCount); + } + + public int opCount() { + return opCount; + } + + @Override + public String toString() { + return "InFlightOpsResponse{" + + "opCount=" + opCount + + '}'; + } + } + + + private class PreSyncedFlushTransportHandler extends BaseTransportRequestHandler { + + @Override + public PreSyncedFlushRequest newInstance() { + return new PreSyncedFlushRequest(); + } + + @Override + public void messageReceived(PreSyncedFlushRequest request, TransportChannel channel) throws Exception { + channel.sendResponse(performPreSyncedFlush(request)); + } + + @Override + public String executor() { + return ThreadPool.Names.FLUSH; + } + } + + + private class SyncedFlushTransportHandler extends BaseTransportRequestHandler { + + @Override + public SyncedFlushRequest newInstance() { + return new SyncedFlushRequest(); + } + + @Override + public void messageReceived(SyncedFlushRequest request, TransportChannel channel) throws Exception { + channel.sendResponse(performSyncedFlush(request)); + } + + @Override + public String executor() { + return ThreadPool.Names.FLUSH; + } + } + + private class InFlightOpCountTransportHandler extends BaseTransportRequestHandler { + + @Override + public InFlightOpsRequest newInstance() { + return new InFlightOpsRequest(); + } + + @Override + public void messageReceived(InFlightOpsRequest request, TransportChannel channel) throws Exception { + channel.sendResponse(performInFlightOps(request)); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + +} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedFlushRequest.java b/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedFlushRequest.java deleted file mode 100644 index 97401bf1cfb..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedFlushRequest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.util.Arrays; - -/** - */ -public class PreSyncedFlushRequest extends BroadcastOperationRequest { - private ShardId shardId; - - - PreSyncedFlushRequest() { - } - - public PreSyncedFlushRequest(ShardId shardId) { - super(Arrays.asList(shardId.getIndex()).toArray(new String[0])); - this.shardId = shardId; - } - - @Override - public String toString() { - return "PreSyncedFlushRequest{" + - "shardId=" + shardId + - '}'; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - shardId.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - this.shardId = ShardId.readShardId(in); - } - - public ShardId shardId() { - return shardId; - } - - public void shardId(ShardId shardId) { - this.shardId = shardId; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedFlushResponse.java b/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedFlushResponse.java deleted file mode 100644 index c757f2f669a..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedFlushResponse.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A response to pre synced flush action. - */ -public class PreSyncedFlushResponse extends BroadcastOperationResponse { - - Map commitIds = new HashMap<>(); - - PreSyncedFlushResponse() { - } - - public PreSyncedFlushResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, AtomicReferenceArray shardsResponses) { - super(totalShards, successfulShards, failedShards, shardFailures); - for (int i = 0; i < shardsResponses.length(); i++) { - PreSyncedShardFlushResponse preSyncedShardFlushResponse = (PreSyncedShardFlushResponse) shardsResponses.get(i); - commitIds.put(preSyncedShardFlushResponse.shardRouting().currentNodeId(), preSyncedShardFlushResponse.id()); - } - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - int numCommitIds = in.readVInt(); - for (int i = 0; i < numCommitIds; i++) { - commitIds.put(in.readString(), in.readByteArray()); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(commitIds.size()); - for (Map.Entry entry : commitIds.entrySet()) { - out.writeString(entry.getKey()); - out.writeByteArray(entry.getValue()); - } - } - - public Map commitIds() { - return commitIds; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedShardFlushRequest.java b/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedShardFlushRequest.java deleted file mode 100644 index 4fa3cbd3c33..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedShardFlushRequest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; -import org.elasticsearch.cluster.routing.ImmutableShardRouting; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; - -/** - * - */ -class PreSyncedShardFlushRequest extends BroadcastShardOperationRequest { - - private ShardRouting shardRouting; - // we need our own request because it has to include the shard routing - private PreSyncedFlushRequest request = new PreSyncedFlushRequest(); - - PreSyncedShardFlushRequest() { - } - - PreSyncedShardFlushRequest(ShardRouting shardRouting, PreSyncedFlushRequest request) { - super(shardRouting.shardId(), request); - this.request = request; - this.shardRouting = shardRouting; - } - - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - request.readFrom(in); - shardRouting = ImmutableShardRouting.readShardRoutingEntry(in); - - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - request.writeTo(out); - shardRouting.writeTo(out); - - } - - PreSyncedFlushRequest getRequest() { - return request; - } - - public ShardRouting shardRouting() { - return shardRouting; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedShardFlushResponse.java b/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedShardFlushResponse.java deleted file mode 100644 index 55f166f3a18..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/PreSyncedShardFlushResponse.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; -import org.elasticsearch.cluster.routing.ImmutableShardRouting; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; - -/** - * - */ -class PreSyncedShardFlushResponse extends BroadcastShardOperationResponse { - byte[] id; - private ShardRouting shardRouting; - - PreSyncedShardFlushResponse() { - } - - PreSyncedShardFlushResponse(byte[] id, ShardRouting shardRouting) { - super(shardRouting.shardId()); - this.id = id; - this.shardRouting = shardRouting; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readByteArray(); - shardRouting = ImmutableShardRouting.readShardRoutingEntry(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeByteArray(id); - shardRouting.writeTo(out); - } - - byte[] id() { - return id; - } - - public ShardRouting shardRouting() { - return shardRouting; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushReplicaResponse.java b/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushReplicaResponse.java deleted file mode 100644 index a08edb635dc..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushReplicaResponse.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.transport.TransportResponse; - -import java.io.IOException; - -public class SyncedFlushReplicaResponse extends TransportResponse { - boolean succeeded = true; - private String index; - private int shardId; - private String nodeId; - private String reason; - - void setResult(boolean succeeded, String index, int shardId, String nodeId, String reason) { - this.succeeded = succeeded; - this.index = index; - this.shardId = shardId; - this.nodeId = nodeId; - this.reason = reason; - } - - public String getIndex() { - return index; - } - - public int getShardId() { - return shardId; - } - - public String getNodeId() { - return nodeId; - } - - public String getReason() { - return reason; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - succeeded = in.readBoolean(); - this.index = in.readString(); - this.shardId = in.readInt(); - this.nodeId = in.readString(); - this.reason = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(succeeded); - out.writeString(index); - out.writeInt(shardId); - out.writeString(nodeId); - out.writeString(reason); - } - -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushRequest.java b/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushRequest.java deleted file mode 100644 index 10ba56d0f65..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushRequest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - */ -public class SyncedFlushRequest extends ShardReplicationOperationRequest { - - private String syncId; - private Map commitIds; - private ShardId shardId; - - public SyncedFlushRequest() { - } - - public SyncedFlushRequest(ShardId shardId, String syncId, Map commitIds) { - this.commitIds = commitIds; - this.shardId = shardId; - this.syncId = syncId; - this.index(shardId.index().getName()); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardId = ShardId.readShardId(in); - commitIds = new HashMap<>(); - int numCommitIds = in.readVInt(); - for (int i = 0; i < numCommitIds; i++) { - commitIds.put(in.readString(), in.readByteArray()); - } - syncId = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - shardId.writeTo(out); - out.writeVInt(commitIds.size()); - for (Map.Entry entry : commitIds.entrySet()) { - out.writeString(entry.getKey()); - out.writeByteArray(entry.getValue()); - } - out.writeString(syncId); - } - - @Override - public String toString() { - return "write sync commit {" + shardId + "}"; - } - - public ShardId shardId() { - return shardId; - } - - public String syncId() { - return syncId; - } - - public Map commitIds() { - return commitIds; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushResponse.java b/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushResponse.java deleted file mode 100644 index 199c7d14978..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushResponse.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.ActionWriteResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; - -/** - */ -public class SyncedFlushResponse extends ActionWriteResponse { - - - private boolean succes; - - String syncCommitId; - - public SyncedFlushResponse() { - - } - - public SyncedFlushResponse(boolean success, String syncCommitId) { - this.succes = success; - this.syncCommitId = syncCommitId; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - this.succes = in.readBoolean(); - syncCommitId = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(succes); - out.writeOptionalString(syncCommitId); - } - - public boolean success() { - return succes; - } - public String getSyncId() { - return syncCommitId; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushService.java deleted file mode 100644 index 5920ef7cf95..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/SyncedFlushService.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; - -import java.util.concurrent.ExecutionException; - -public class SyncedFlushService extends AbstractComponent { - - private final TransportPreSyncedFlushAction transportPreSyncedFlushAction; - private final TransportSyncedFlushAction transportSyncedFlushAction; - - @Inject - public SyncedFlushService(Settings settings, TransportPreSyncedFlushAction transportPreSyncedFlushAction, TransportSyncedFlushAction transportSyncedFlushAction) { - super(settings); - this.transportPreSyncedFlushAction = transportPreSyncedFlushAction; - this.transportSyncedFlushAction = transportSyncedFlushAction; - } - - public SyncedFlushResponse attemptSyncedFlush(ShardId shardId) throws ExecutionException, InterruptedException { - PreSyncedFlushResponse preSyncedFlushResponse = transportPreSyncedFlushAction.execute(new PreSyncedFlushRequest(shardId)).get(); - // exit if this did not work - String syncId = Strings.base64UUID(); - SyncedFlushResponse syncedFlushResponse = transportSyncedFlushAction.execute(new SyncedFlushRequest(shardId, syncId, preSyncedFlushResponse.commitIds())).get(); - return syncedFlushResponse; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/TransportPreSyncedFlushAction.java b/src/main/java/org/elasticsearch/indices/syncedflush/TransportPreSyncedFlushAction.java deleted file mode 100644 index 7a76136fb95..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/TransportPreSyncedFlushAction.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; - -import static com.google.common.collect.Lists.newArrayList; - - -/** - * Sync Commit Action. - */ -public class TransportPreSyncedFlushAction extends TransportBroadcastOperationAction { - - private final IndicesService indicesService; - - public static final String NAME = "indices:admin/presyncedflush"; - - @Inject - public TransportPreSyncedFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) { - super(settings, NAME, threadPool, clusterService, transportService, actionFilters); - this.indicesService = indicesService; - } - - @Override - protected String executor() { - return ThreadPool.Names.FLUSH; - } - - @Override - protected PreSyncedFlushRequest newRequestInstance() { - return new PreSyncedFlushRequest(); - } - - @Override - protected PreSyncedFlushResponse newResponse(PreSyncedFlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { - int successfulShards = 0; - int failedShards = 0; - List shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // a non active shard, ignore - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = newArrayList(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new PreSyncedFlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, shardsResponses); - } - - @Override - protected PreSyncedShardFlushRequest newShardRequest() { - return new PreSyncedShardFlushRequest(); - } - - @Override - protected PreSyncedShardFlushRequest newShardRequest(int numShards, ShardRouting shard, PreSyncedFlushRequest request) { - return new PreSyncedShardFlushRequest(shard, request); - } - - @Override - protected PreSyncedShardFlushResponse newShardResponse() { - return new PreSyncedShardFlushResponse(); - } - - @Override - protected PreSyncedShardFlushResponse shardOperation(PreSyncedShardFlushRequest request) throws ElasticsearchException { - IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); - FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); - byte[] id = indexShard.flush(flushRequest); - return new PreSyncedShardFlushResponse(id, request.shardRouting()); - } - - /** - * The sync commit request works against one primary and all of its copies. - */ - @Override - protected GroupShardsIterator shards(ClusterState clusterState, PreSyncedFlushRequest request, String[] concreteIndices) { - return clusterState.routingTable().allActiveShardCopiesGrouped(request.shardId()); - } - - @Override - protected ClusterBlockException checkGlobalBlock(ClusterState state, PreSyncedFlushRequest request) { - return null; - } - - @Override - protected ClusterBlockException checkRequestBlock(ClusterState state, PreSyncedFlushRequest countRequest, String[] concreteIndices) { - return null; - } -} diff --git a/src/main/java/org/elasticsearch/indices/syncedflush/TransportSyncedFlushAction.java b/src/main/java/org/elasticsearch/indices/syncedflush/TransportSyncedFlushAction.java deleted file mode 100644 index b8213a13226..00000000000 --- a/src/main/java/org/elasticsearch/indices/syncedflush/TransportSyncedFlushAction.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.action.ActionWriteResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - */ -public class TransportSyncedFlushAction extends TransportShardReplicationOperationAction { - - public static final String NAME = "indices:admin/syncedflush"; - - @Inject - public TransportSyncedFlushAction(Settings settings, TransportService transportService, ClusterService clusterService, - IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters) { - super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters); - } - - @Override - protected boolean checkWriteConsistency() { - return true; - } - - @Override - protected boolean resolveIndex() { - return false; - } - - @Override - protected SyncedFlushRequest newRequestInstance() { - return new SyncedFlushRequest(); - } - - @Override - protected SyncedFlushRequest newReplicaRequestInstance() { - return new SyncedFlushRequest(); - } - - @Override - protected SyncedFlushResponse newResponseInstance() { - return new SyncedFlushResponse(); - } - - @Override - protected SyncedFlushReplicaResponse newReplicaResponseInstance() { - return new SyncedFlushReplicaResponse(); - } - - @Override - protected String executor() { - return ThreadPool.Names.FLUSH; - } - - @Override - protected ShardIterator shards(ClusterState clusterState, InternalRequest request) { - // get all shards for id - return clusterService.state().routingTable().index(request.concreteIndex()).shard(request.request().shardId().id()).shardsIt(); - } - - @Override - protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { - byte[] commitId = shardRequest.request.commitIds().get(clusterService.localNode().getId()); - IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); - IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); - SyncedFlushResponse syncedFlushResponse = new SyncedFlushResponse(indexShard.syncFlushIfNoPendingChanges(shardRequest.request.syncId(), commitId), shardRequest.request.syncId()); - if (syncedFlushResponse.success() == false) { - throw new ElasticsearchIllegalStateException("could not sync commit on primary"); - } - return new Tuple<>(syncedFlushResponse, shardRequest.request); - } - - @Override - protected SyncedFlushReplicaResponse shardOperationOnReplica(ReplicaOperationRequest shardRequest) { - byte[] commitId = shardRequest.request.commitIds().get(clusterService.localNode().getId()); - IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); - IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); - SyncedFlushReplicaResponse syncedFlushReplicaResponse = new SyncedFlushReplicaResponse(); - boolean success = indexShard.syncFlushIfNoPendingChanges(shardRequest.request.syncId(), commitId); - String message = success ? "synced flush succeeded" : "synced flush failed"; - syncedFlushReplicaResponse.setResult(success, shardRequest.request.index(), shardRequest.shardId.id(), shardRequest.getNodeId(), message); - return syncedFlushReplicaResponse; - } - - - protected SyncedFlushResponse onAllReplicasResponded(SyncedFlushResponse finalResponse, CopyOnWriteArrayList replicaResponses) { - List additionalFailures = new ArrayList<>(); - for (SyncedFlushReplicaResponse replicaResponse : replicaResponses) { - if (replicaResponse.succeeded == false) { - additionalFailures.add(new ActionWriteResponse.ShardInfo.Failure(replicaResponse.getIndex(), replicaResponse.getShardId(), replicaResponse.getNodeId(), replicaResponse.getReason(), RestStatus.CONFLICT, false)); - } - } - additionalFailures.addAll(Arrays.asList(finalResponse.getShardInfo().getFailures())); - finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(finalResponse.getShardInfo().getTotal(), finalResponse.getShardInfo().getTotal() - additionalFailures.size(), additionalFailures.toArray(new ActionWriteResponse.ShardInfo.Failure[additionalFailures.size()]))); - return finalResponse; - } -} diff --git a/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java b/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java index 250335d4aaa..f6f6c6cc38e 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java +++ b/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -30,7 +29,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.junit.Before; @@ -246,20 +244,4 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase { fail("Calling with non-existing index should be ignored at the moment"); } } - - @Test - public void testAllActiveShardCopiesGrouped() { - assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(0)); - try { - this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, numberOfShards)).size(); - fail(); - } catch (ElasticsearchIllegalStateException e) { - } - initPrimaries(); - assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(0)); - startInitializingShards(TEST_INDEX_1); - assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(1)); - startInitializingShards(TEST_INDEX_1); - assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(numberOfReplicas + 1)); - } } diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c05467ea5a3..ccc84c2caf7 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -695,11 +695,14 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(commitID, equalTo(store.readLastCommittedSegmentsInfo().getId())); byte[] fakeId = commitID.clone(); fakeId[0] = (byte) ~fakeId[0]; - assertFalse("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", fakeId)); + assertThat("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", fakeId), + equalTo(Engine.SyncedFlushResult.FAILED_COMMIT_MISMATCH)); engine.create(new Engine.Create(null, newUid("2"), doc)); - assertFalse("should fail to sync flush with right id but pending doc", engine.syncFlushIfNoPendingChanges(syncId + "2", commitID)); + assertThat("should fail to sync flush with right id but pending doc", engine.syncFlushIfNoPendingChanges(syncId + "2", commitID), + equalTo(Engine.SyncedFlushResult.FAILED_PENDING_OPERATIONS)); commitID = engine.flush(); - assertTrue("should succeed to flush commit with right id and no pending doc", engine.syncFlushIfNoPendingChanges(syncId, commitID)); + assertThat("should succeed to flush commit with right id and no pending doc", engine.syncFlushIfNoPendingChanges(syncId, commitID), + equalTo(Engine.SyncedFlushResult.SUCCESS)); assertThat(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); } diff --git a/src/test/java/org/elasticsearch/indices/FlushTest.java b/src/test/java/org/elasticsearch/indices/FlushTest.java index f04e30f2d48..a4a9b4d2171 100644 --- a/src/test/java/org/elasticsearch/indices/FlushTest.java +++ b/src/test/java/org/elasticsearch/indices/FlushTest.java @@ -19,16 +19,14 @@ package org.elasticsearch.indices; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.syncedflush.SyncedFlushResponse; -import org.elasticsearch.indices.syncedflush.SyncedFlushService; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.io.IOException; @@ -79,6 +77,7 @@ public class FlushTest extends ElasticsearchIntegrationTest { } } + @TestLogging("indices:TRACE") public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException { internalCluster().ensureAtLeastNumDataNodes(2); prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).get(); @@ -89,15 +88,16 @@ public class FlushTest extends ElasticsearchIntegrationTest { assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } - ClusterStateResponse state = client().admin().cluster().prepareState().get(); - String nodeId = state.getState().getRoutingTable().index("test").shard(0).getShards().get(0).currentNodeId(); - SyncedFlushResponse syncedFlushResponse = internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0)); - assertTrue(syncedFlushResponse.success()); + SyncedFlushService.SyncedFlushResult result = internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0)); + assertTrue(result.success()); + assertThat(result.totalShards(), equalTo(indexStats.getShards().length)); + assertThat(result.successfulShards(), equalTo(indexStats.getShards().length)); indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); - assertThat(indexStats.getShards().length, equalTo(client().admin().indices().prepareGetIndex().get().getSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1) + 1)); + String syncId = result.syncId(); for (ShardStats shardStats : indexStats.getShards()) { - assertThat(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncedFlushResponse.getSyncId())); + final String shardSyncId = shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID); + assertThat(shardSyncId, equalTo(syncId)); } } diff --git a/src/test/java/org/elasticsearch/indices/syncedflush/SynceFlushStreamablesTests.java b/src/test/java/org/elasticsearch/indices/syncedflush/SynceFlushStreamablesTests.java deleted file mode 100644 index 29c21e76d92..00000000000 --- a/src/test/java/org/elasticsearch/indices/syncedflush/SynceFlushStreamablesTests.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.indices.syncedflush; - -import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.cluster.routing.ImmutableShardRouting; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ElasticsearchTestCase; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReferenceArray; - -public class SynceFlushStreamablesTests extends ElasticsearchTestCase { - - @Test - public void streamWriteSyncResponse() throws InterruptedException, IOException { - ShardId shardId = new ShardId("test", 0); - Map commitIds = new HashMap<>(); - final String nodeId = "node_id"; - commitIds.put(nodeId, generateRandomId(randomInt(100))); - SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(shardId, randomAsciiOfLength(5), commitIds); - BytesStreamOutput out = new BytesStreamOutput(); - syncedFlushRequest.writeTo(out); - out.close(); - StreamInput in = new BytesStreamInput(out.bytes()); - SyncedFlushRequest request = new SyncedFlushRequest(); - request.readFrom(in); - assertArrayEquals(request.commitIds().get(nodeId), syncedFlushRequest.commitIds().get(nodeId)); - } - - @Test - public void streamSyncResponse() throws InterruptedException, IOException { - ShardRouting shardRouting = new ImmutableShardRouting("test", 0, "test_node", - "other_test_node", randomBoolean(), ShardRoutingState.STARTED, randomInt()); - AtomicReferenceArray atomicReferenceArray = new AtomicReferenceArray(1); - atomicReferenceArray.set(0, new PreSyncedShardFlushResponse(generateRandomId(randomInt(100)), shardRouting)); - PreSyncedFlushResponse preSyncedFlushResponse = new PreSyncedFlushResponse(randomInt(), randomInt(), randomInt(), new ArrayList(), atomicReferenceArray); - BytesStreamOutput out = new BytesStreamOutput(); - preSyncedFlushResponse.writeTo(out); - out.close(); - StreamInput in = new BytesStreamInput(out.bytes()); - PreSyncedFlushResponse request = new PreSyncedFlushResponse(); - request.readFrom(in); - assertArrayEquals(request.commitIds().get(shardRouting), preSyncedFlushResponse.commitIds().get(shardRouting)); - } - - @Test - public void streamShardSyncResponse() throws InterruptedException, IOException { - ShardRouting shardRouting = new ImmutableShardRouting("test", 0, "test_node", - "other_test_node", randomBoolean(), ShardRoutingState.STARTED, randomInt()); - PreSyncedShardFlushResponse preSyncedShardFlushResponse = new PreSyncedShardFlushResponse(generateRandomId(randomInt(100)), shardRouting); - BytesStreamOutput out = new BytesStreamOutput(); - preSyncedShardFlushResponse.writeTo(out); - out.close(); - StreamInput in = new BytesStreamInput(out.bytes()); - PreSyncedShardFlushResponse request = new PreSyncedShardFlushResponse(); - request.readFrom(in); - assertArrayEquals(request.id(), preSyncedShardFlushResponse.id()); - } - - byte[] generateRandomId(int length) { - byte[] id = new byte[length]; - for (int i = 0; i < length; i++) { - id[i] = randomByte(); - } - return id; - } -}