Refactor all actions into inner actions in SyncedFlushService. Add an inflight counter action. Add SyncedFlushResult counter

This commit is contained in:
Boaz Leskes 2015-04-24 12:38:42 +02:00
parent 3633b83b1f
commit afdab84f2d
21 changed files with 679 additions and 1000 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.IntSet;
import com.google.common.collect.*;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -255,35 +254,6 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return new GroupShardsIterator(set);
}
/**
* All the shard copies for the provided shard id grouped. Each group is a single element, consisting
* either of the primary shard of one replica.
*
* @param shardId the shard id for the copies we want
* @return All the shard copies (primary and replicas) for the shardId
* @throws IndexMissingException If an index passed does not exists
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator allActiveShardCopiesGrouped(ShardId shardId) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
IndexRoutingTable indexRoutingTable = index(shardId.index().name());
if (indexRoutingTable == null) {
throw new IndexMissingException(new Index(shardId.index().name()));
}
IndexShardRoutingTable copiesRoutingTable = indexRoutingTable.shard(shardId.id());
if (copiesRoutingTable != null) {
for (ShardRouting shardRouting : copiesRoutingTable) {
if (shardRouting.active()) {
set.add(shardRouting.shardsIt());
}
}
} else {
throw new ElasticsearchIllegalStateException(shardId + " does not exist");
}
return new GroupShardsIterator(set);
}
public static Builder builder() {
return new Builder();
}

View File

@ -215,7 +215,13 @@ public abstract class Engine implements Closeable {
* @param expectedCommitId the expected value of
* @return true if the sync commit was made, false o.w.
*/
public abstract boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException;
public abstract SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException;
public enum SyncedFlushResult {
SUCCESS,
FAILED_COMMIT_MISMATCH,
FAILED_PENDING_OPERATIONS
}
final protected GetResult getFromSearcher(Get get) throws EngineException {
final Searcher searcher = acquireSearcher("get");

View File

@ -598,26 +598,26 @@ public class InternalEngine extends Engine {
}
@Override
public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException {
public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return false;
return SyncedFlushResult.FAILED_PENDING_OPERATIONS;
}
if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return false;
return SyncedFlushResult.FAILED_COMMIT_MISMATCH;
}
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return false;
return SyncedFlushResult.FAILED_PENDING_OPERATIONS;
}
if (Arrays.equals(expectedCommitId, lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return false;
return SyncedFlushResult.FAILED_COMMIT_MISMATCH;
}
logger.trace("starting sync commit [{}]", syncId);
long translogId = translog.currentId();
@ -628,7 +628,7 @@ public class InternalEngine extends Engine {
commitIndexWriter(indexWriter);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return true;
return SyncedFlushResult.SUCCESS;
} catch (IOException ex) {
maybeFailEngine("sync commit", ex);
throw new EngineException(shardId, "failed to sync commit", ex);

View File

@ -108,7 +108,7 @@ public class ShadowEngine extends Engine {
}
@Override
public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) {
public SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) {
throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine");
}

View File

@ -116,7 +116,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -681,7 +680,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return completionStats;
}
public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) {
public Engine.SyncedFlushResult syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
return engine().syncFlushIfNoPendingChanges(syncId, expectedCommitId);

View File

@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.syncedflush.SyncedFlushService;
import org.elasticsearch.indices.ttl.IndicesTTLService;
/**

View File

@ -0,0 +1,649 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class SyncedFlushService extends AbstractComponent {
// nocommmit: check these are ok
public static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
public static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
public static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight";
public static final String SETTING_PRE_SYNC_TIMEOUT = "indices.flush.synced.presync_timeout";
public static final String SETTING_SYNC_TIMEOUT = "indices.flush.synced.sync_timeout";
public static final String SETTING_IN_FLIGHT_OPS_TIMEOUT = "indices.flush.synced.in_flight_ops_timeout";
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private final TimeValue preSyncTimeout;
private final TimeValue syncTimeout;
private final TimeValue inflightOpsTimeout;
@Inject
public SyncedFlushService(Settings settings, IndicesService indicesService, ClusterService clusterService, TransportService transportService) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
transportService.registerHandler(PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushTransportHandler());
transportService.registerHandler(SYNCED_FLUSH_ACTION_NAME, new SyncedFlushTransportHandler());
transportService.registerHandler(IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpCountTransportHandler());
preSyncTimeout = settings.getAsTime(SETTING_PRE_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5));
syncTimeout = settings.getAsTime(SETTING_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5));
inflightOpsTimeout = settings.getAsTime(SETTING_IN_FLIGHT_OPS_TIMEOUT, TimeValue.timeValueMinutes(5));
}
public SyncedFlushResult attemptSyncedFlush(ShardId shardId) throws ExecutionException, InterruptedException {
final ClusterState state = clusterService.state();
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {
throw new IndexMissingException(shardId.index());
}
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id());
if (shardRoutingTable == null) {
throw new IndexShardMissingException(shardId);
}
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
Map<String, byte[]> commitIds = sendPreSyncRequests(activeShards, state, shardId);
if (commitIds.isEmpty()) {
return new SyncedFlushResult("all shards failed to commit on pre-sync");
}
int inflight = getInflightOpsCount(shardId, state, shardRoutingTable);
if (inflight < 0 || inflight > 0) {
return new SyncedFlushResult("operation counter on primary is non zero [" + inflight + "]");
}
String syncId = Strings.base64UUID();
Map<ShardRouting, SyncedFlushResponse> results = sendSyncRequests(syncId, activeShards, state, commitIds, shardId);
return new SyncedFlushResult(syncId, results);
}
/** returns the number of inflight operations on primary. -1 upon error. */
protected int getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable) {
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId());
if (primaryNode == null) {
logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard);
return -1;
}
final AtomicInteger result = new AtomicInteger(-1);
final CountDownLatch latch = new CountDownLatch(1);
transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
new BaseTransportResponseHandler<InFlightOpsResponse>() {
@Override
public InFlightOpsResponse newInstance() {
return new InFlightOpsResponse();
}
@Override
public void handleResponse(InFlightOpsResponse response) {
result.set(response.opCount());
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
logger.debug("{} unexpected error while retrieving inflight op count", shardId);
result.set(-1);
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
try {
if (latch.await(inflightOpsTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("{} in flight operation check timed out after [{}]", shardId, syncTimeout);
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for in flight operation check", shardId);
}
final int count = result.get();
logger.trace("{} in flight operation count [{}]", shardId, count);
return count;
}
private Map<ShardRouting, SyncedFlushResponse> sendSyncRequests(final String syncId, List<ShardRouting> shards, ClusterState state, Map<String, byte[]> expectedCommitIds, final ShardId shardId) {
final CountDownLatch countDownLatch = new CountDownLatch(shards.size());
final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("unknown node"));
countDownLatch.countDown();
continue;
}
final byte[] expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
if (expectedCommitId == null) {
logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
countDownLatch.countDown();
continue;
}
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new SyncedFlushRequest(shard.shardId(), syncId, expectedCommitId),
new BaseTransportResponseHandler<SyncedFlushResponse>() {
@Override
public SyncedFlushResponse newInstance() {
return new SyncedFlushResponse();
}
@Override
public void handleResponse(SyncedFlushResponse response) {
SyncedFlushResponse existing = results.put(shard, response);
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
countDownLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
results.put(shard, new SyncedFlushResponse(exp.getMessage()));
countDownLatch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
try {
if (countDownLatch.await(syncTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("{} waiting for synced flush with id [{}] timed out after [{}]. pending ops [{}]", shardId, syncId, syncTimeout, countDownLatch.getCount());
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for sync requests (sync id [{}])", shardId, syncId);
}
return results;
}
/** send presync requests to all started copies of the given shard */
Map<String, byte[]> sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId) {
final CountDownLatch countDownLatch = new CountDownLatch(shards.size());
final Map<String, byte[]> commitIds = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
countDownLatch.countDown();
continue;
}
transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler<PreSyncedFlushResponse>() {
@Override
public PreSyncedFlushResponse newInstance() {
return new PreSyncedFlushResponse();
}
@Override
public void handleResponse(PreSyncedFlushResponse response) {
byte[] existing = commitIds.put(node.id(), response.commitId());
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
countDownLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard);
countDownLatch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
try {
if (countDownLatch.await(preSyncTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("{} waiting for pre sync flush requests timed out after [{}]. pending ops [{}]", shardId, preSyncTimeout, countDownLatch.getCount());
}
} catch (InterruptedException e) {
logger.debug("{} interrupted while waiting for presync requests", shardId);
}
return commitIds;
}
private PreSyncedFlushResponse performPreSyncedFlush(PreSyncedFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId());
byte[] id = indexShard.flush(flushRequest);
logger.trace("{} pre sync flush done. commit id {}", request.shardId(), id);
return new PreSyncedFlushResponse(id);
}
private SyncedFlushResponse performSyncedFlush(SyncedFlushRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
Engine.SyncedFlushResult result = indexShard.syncFlushIfNoPendingChanges(request.syncId(), request.expectedCommitId());
logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
switch (result) {
case SUCCESS:
return new SyncedFlushResponse();
case FAILED_COMMIT_MISMATCH:
return new SyncedFlushResponse("commit has changed");
case FAILED_PENDING_OPERATIONS:
return new SyncedFlushResponse("pending operations");
default:
throw new ElasticsearchException("unknown synced flush result [" + result + "]");
}
}
private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardException(request.shardId(), "expected a primary shard");
}
// nocommit - implement :)
int opCount = 0;
logger.trace("{} in flight operations sampled at [{}]", request.shardId(), opCount);
return new InFlightOpsResponse(opCount);
}
public static class SyncedFlushResult {
private final String failureReason;
private final Map<ShardRouting, SyncedFlushResponse> shardResponses;
private final String syncId;
/** failure constructor */
SyncedFlushResult(String failureReason) {
this.syncId = null;
this.failureReason = failureReason;
this.shardResponses = new HashMap<>();
}
/** success constructor */
SyncedFlushResult(String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
this.failureReason = null;
this.shardResponses = shardResponses;
this.syncId = syncId;
}
public boolean success() {
return syncId != null;
}
public String failureReason() {
return failureReason;
}
public String syncId() {
return syncId;
}
/** total number of shards for which a sync attempt was made */
public int totalShards() {
return shardResponses.size();
}
public int successfulShards() {
int i = 0;
for (SyncedFlushResponse result : shardResponses.values()) {
if (result.success()) {
i++;
}
}
return i;
}
public Map<ShardRouting, SyncedFlushResponse> shardResponses() {
return shardResponses;
}
}
final static class PreSyncedFlushRequest extends TransportRequest {
private ShardId shardId;
PreSyncedFlushRequest() {
}
public PreSyncedFlushRequest(ShardId shardId) {
this.shardId = shardId;
}
@Override
public String toString() {
return "PreSyncedFlushRequest{" +
"shardId=" + shardId +
'}';
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.shardId = ShardId.readShardId(in);
}
public ShardId shardId() {
return shardId;
}
}
final static class PreSyncedFlushResponse extends TransportResponse {
private byte[] commitId;
PreSyncedFlushResponse() {
}
PreSyncedFlushResponse(byte[] commitId) {
this.commitId = commitId;
}
public byte[] commitId() {
return commitId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
commitId = in.readByteArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByteArray(commitId);
}
}
static final class SyncedFlushRequest extends TransportRequest {
private String syncId;
private byte[] expectedCommitId;
private ShardId shardId;
public SyncedFlushRequest() {
}
public SyncedFlushRequest(ShardId shardId, String syncId, byte[] expectedCommitId) {
this.expectedCommitId = expectedCommitId;
this.shardId = shardId;
this.syncId = syncId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
expectedCommitId = in.readByteArray();
syncId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeByteArray(expectedCommitId);
out.writeString(syncId);
}
public ShardId shardId() {
return shardId;
}
public String syncId() {
return syncId;
}
public byte[] expectedCommitId() {
return expectedCommitId;
}
@Override
public String toString() {
return "SyncedFlushRequest{" +
"shardId=" + shardId +
",syncId='" + syncId + '\'' +
'}';
}
}
static final class SyncedFlushResponse extends TransportResponse {
/** a non null value indicates a failure to sync flush. null means success */
String failureReason;
public SyncedFlushResponse() {
failureReason = null;
}
public SyncedFlushResponse(String failureReason) {
this.failureReason = failureReason;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureReason = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(failureReason);
}
public boolean success() {
return failureReason == null;
}
public String failureReason() {
return failureReason;
}
@Override
public String toString() {
return "SyncedFlushResponse{" +
"success=" + success() +
", failureReason='" + failureReason + '\'' +
'}';
}
}
static final class InFlightOpsRequest extends TransportRequest {
private ShardId shardId;
public InFlightOpsRequest() {
}
public InFlightOpsRequest(ShardId shardId) {
this.shardId = shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
}
public ShardId shardId() {
return shardId;
}
@Override
public String toString() {
return "InFlightOpsRequest{" +
"shardId=" + shardId +
'}';
}
}
static final class InFlightOpsResponse extends TransportResponse {
int opCount;
public InFlightOpsResponse() {
}
public InFlightOpsResponse(int opCount) {
this.opCount = opCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
opCount = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(opCount);
}
public int opCount() {
return opCount;
}
@Override
public String toString() {
return "InFlightOpsResponse{" +
"opCount=" + opCount +
'}';
}
}
private class PreSyncedFlushTransportHandler extends BaseTransportRequestHandler<PreSyncedFlushRequest> {
@Override
public PreSyncedFlushRequest newInstance() {
return new PreSyncedFlushRequest();
}
@Override
public void messageReceived(PreSyncedFlushRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(performPreSyncedFlush(request));
}
@Override
public String executor() {
return ThreadPool.Names.FLUSH;
}
}
private class SyncedFlushTransportHandler extends BaseTransportRequestHandler<SyncedFlushRequest> {
@Override
public SyncedFlushRequest newInstance() {
return new SyncedFlushRequest();
}
@Override
public void messageReceived(SyncedFlushRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(performSyncedFlush(request));
}
@Override
public String executor() {
return ThreadPool.Names.FLUSH;
}
}
private class InFlightOpCountTransportHandler extends BaseTransportRequestHandler<InFlightOpsRequest> {
@Override
public InFlightOpsRequest newInstance() {
return new InFlightOpsRequest();
}
@Override
public void messageReceived(InFlightOpsRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(performInFlightOps(request));
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public class PreSyncedFlushRequest extends BroadcastOperationRequest<PreSyncedFlushRequest> {
private ShardId shardId;
PreSyncedFlushRequest() {
}
public PreSyncedFlushRequest(ShardId shardId) {
super(Arrays.asList(shardId.getIndex()).toArray(new String[0]));
this.shardId = shardId;
}
@Override
public String toString() {
return "PreSyncedFlushRequest{" +
"shardId=" + shardId +
'}';
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.shardId = ShardId.readShardId(in);
}
public ShardId shardId() {
return shardId;
}
public void shardId(ShardId shardId) {
this.shardId = shardId;
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* A response to pre synced flush action.
*/
public class PreSyncedFlushResponse extends BroadcastOperationResponse {
Map<String, byte[]> commitIds = new HashMap<>();
PreSyncedFlushResponse() {
}
public PreSyncedFlushResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, AtomicReferenceArray shardsResponses) {
super(totalShards, successfulShards, failedShards, shardFailures);
for (int i = 0; i < shardsResponses.length(); i++) {
PreSyncedShardFlushResponse preSyncedShardFlushResponse = (PreSyncedShardFlushResponse) shardsResponses.get(i);
commitIds.put(preSyncedShardFlushResponse.shardRouting().currentNodeId(), preSyncedShardFlushResponse.id());
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int numCommitIds = in.readVInt();
for (int i = 0; i < numCommitIds; i++) {
commitIds.put(in.readString(), in.readByteArray());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(commitIds.size());
for (Map.Entry<String, byte[]> entry : commitIds.entrySet()) {
out.writeString(entry.getKey());
out.writeByteArray(entry.getValue());
}
}
public Map<String, byte[]> commitIds() {
return commitIds;
}
}

View File

@ -1,72 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*
*/
class PreSyncedShardFlushRequest extends BroadcastShardOperationRequest {
private ShardRouting shardRouting;
// we need our own request because it has to include the shard routing
private PreSyncedFlushRequest request = new PreSyncedFlushRequest();
PreSyncedShardFlushRequest() {
}
PreSyncedShardFlushRequest(ShardRouting shardRouting, PreSyncedFlushRequest request) {
super(shardRouting.shardId(), request);
this.request = request;
this.shardRouting = shardRouting;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request.readFrom(in);
shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
shardRouting.writeTo(out);
}
PreSyncedFlushRequest getRequest() {
return request;
}
public ShardRouting shardRouting() {
return shardRouting;
}
}

View File

@ -1,67 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*
*/
class PreSyncedShardFlushResponse extends BroadcastShardOperationResponse {
byte[] id;
private ShardRouting shardRouting;
PreSyncedShardFlushResponse() {
}
PreSyncedShardFlushResponse(byte[] id, ShardRouting shardRouting) {
super(shardRouting.shardId());
this.id = id;
this.shardRouting = shardRouting;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readByteArray();
shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByteArray(id);
shardRouting.writeTo(out);
}
byte[] id() {
return id;
}
public ShardRouting shardRouting() {
return shardRouting;
}
}

View File

@ -1,80 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
public class SyncedFlushReplicaResponse extends TransportResponse {
boolean succeeded = true;
private String index;
private int shardId;
private String nodeId;
private String reason;
void setResult(boolean succeeded, String index, int shardId, String nodeId, String reason) {
this.succeeded = succeeded;
this.index = index;
this.shardId = shardId;
this.nodeId = nodeId;
this.reason = reason;
}
public String getIndex() {
return index;
}
public int getShardId() {
return shardId;
}
public String getNodeId() {
return nodeId;
}
public String getReason() {
return reason;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
succeeded = in.readBoolean();
this.index = in.readString();
this.shardId = in.readInt();
this.nodeId = in.readString();
this.reason = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(succeeded);
out.writeString(index);
out.writeInt(shardId);
out.writeString(nodeId);
out.writeString(reason);
}
}

View File

@ -1,89 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class SyncedFlushRequest extends ShardReplicationOperationRequest<SyncedFlushRequest> {
private String syncId;
private Map<String, byte[]> commitIds;
private ShardId shardId;
public SyncedFlushRequest() {
}
public SyncedFlushRequest(ShardId shardId, String syncId, Map<String, byte[]> commitIds) {
this.commitIds = commitIds;
this.shardId = shardId;
this.syncId = syncId;
this.index(shardId.index().getName());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
commitIds = new HashMap<>();
int numCommitIds = in.readVInt();
for (int i = 0; i < numCommitIds; i++) {
commitIds.put(in.readString(), in.readByteArray());
}
syncId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeVInt(commitIds.size());
for (Map.Entry<String, byte[]> entry : commitIds.entrySet()) {
out.writeString(entry.getKey());
out.writeByteArray(entry.getValue());
}
out.writeString(syncId);
}
@Override
public String toString() {
return "write sync commit {" + shardId + "}";
}
public ShardId shardId() {
return shardId;
}
public String syncId() {
return syncId;
}
public Map<String, byte[]> commitIds() {
return commitIds;
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class SyncedFlushResponse extends ActionWriteResponse {
private boolean succes;
String syncCommitId;
public SyncedFlushResponse() {
}
public SyncedFlushResponse(boolean success, String syncCommitId) {
this.succes = success;
this.syncCommitId = syncCommitId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.succes = in.readBoolean();
syncCommitId = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(succes);
out.writeOptionalString(syncCommitId);
}
public boolean success() {
return succes;
}
public String getSyncId() {
return syncCommitId;
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import java.util.concurrent.ExecutionException;
public class SyncedFlushService extends AbstractComponent {
private final TransportPreSyncedFlushAction transportPreSyncedFlushAction;
private final TransportSyncedFlushAction transportSyncedFlushAction;
@Inject
public SyncedFlushService(Settings settings, TransportPreSyncedFlushAction transportPreSyncedFlushAction, TransportSyncedFlushAction transportSyncedFlushAction) {
super(settings);
this.transportPreSyncedFlushAction = transportPreSyncedFlushAction;
this.transportSyncedFlushAction = transportSyncedFlushAction;
}
public SyncedFlushResponse attemptSyncedFlush(ShardId shardId) throws ExecutionException, InterruptedException {
PreSyncedFlushResponse preSyncedFlushResponse = transportPreSyncedFlushAction.execute(new PreSyncedFlushRequest(shardId)).get();
// exit if this did not work
String syncId = Strings.base64UUID();
SyncedFlushResponse syncedFlushResponse = transportSyncedFlushAction.execute(new SyncedFlushRequest(shardId, syncId, preSyncedFlushResponse.commitIds())).get();
return syncedFlushResponse;
}
}

View File

@ -1,135 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.newArrayList;
/**
* Sync Commit Action.
*/
public class TransportPreSyncedFlushAction extends TransportBroadcastOperationAction<PreSyncedFlushRequest, PreSyncedFlushResponse, PreSyncedShardFlushRequest, PreSyncedShardFlushResponse> {
private final IndicesService indicesService;
public static final String NAME = "indices:admin/presyncedflush";
@Inject
public TransportPreSyncedFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.FLUSH;
}
@Override
protected PreSyncedFlushRequest newRequestInstance() {
return new PreSyncedFlushRequest();
}
@Override
protected PreSyncedFlushResponse newResponse(PreSyncedFlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// a non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new PreSyncedFlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, shardsResponses);
}
@Override
protected PreSyncedShardFlushRequest newShardRequest() {
return new PreSyncedShardFlushRequest();
}
@Override
protected PreSyncedShardFlushRequest newShardRequest(int numShards, ShardRouting shard, PreSyncedFlushRequest request) {
return new PreSyncedShardFlushRequest(shard, request);
}
@Override
protected PreSyncedShardFlushResponse newShardResponse() {
return new PreSyncedShardFlushResponse();
}
@Override
protected PreSyncedShardFlushResponse shardOperation(PreSyncedShardFlushRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
byte[] id = indexShard.flush(flushRequest);
return new PreSyncedShardFlushResponse(id, request.shardRouting());
}
/**
* The sync commit request works against one primary and all of its copies.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, PreSyncedFlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardCopiesGrouped(request.shardId());
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PreSyncedFlushRequest request) {
return null;
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, PreSyncedFlushRequest countRequest, String[] concreteIndices) {
return null;
}
}

View File

@ -1,136 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
public class TransportSyncedFlushAction extends TransportShardReplicationOperationAction<SyncedFlushRequest, SyncedFlushRequest, SyncedFlushResponse, SyncedFlushReplicaResponse> {
public static final String NAME = "indices:admin/syncedflush";
@Inject
public TransportSyncedFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
}
@Override
protected boolean checkWriteConsistency() {
return true;
}
@Override
protected boolean resolveIndex() {
return false;
}
@Override
protected SyncedFlushRequest newRequestInstance() {
return new SyncedFlushRequest();
}
@Override
protected SyncedFlushRequest newReplicaRequestInstance() {
return new SyncedFlushRequest();
}
@Override
protected SyncedFlushResponse newResponseInstance() {
return new SyncedFlushResponse();
}
@Override
protected SyncedFlushReplicaResponse newReplicaResponseInstance() {
return new SyncedFlushReplicaResponse();
}
@Override
protected String executor() {
return ThreadPool.Names.FLUSH;
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
// get all shards for id
return clusterService.state().routingTable().index(request.concreteIndex()).shard(request.request().shardId().id()).shardsIt();
}
@Override
protected Tuple<SyncedFlushResponse, SyncedFlushRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
byte[] commitId = shardRequest.request.commitIds().get(clusterService.localNode().getId());
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SyncedFlushResponse syncedFlushResponse = new SyncedFlushResponse(indexShard.syncFlushIfNoPendingChanges(shardRequest.request.syncId(), commitId), shardRequest.request.syncId());
if (syncedFlushResponse.success() == false) {
throw new ElasticsearchIllegalStateException("could not sync commit on primary");
}
return new Tuple<>(syncedFlushResponse, shardRequest.request);
}
@Override
protected SyncedFlushReplicaResponse shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
byte[] commitId = shardRequest.request.commitIds().get(clusterService.localNode().getId());
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SyncedFlushReplicaResponse syncedFlushReplicaResponse = new SyncedFlushReplicaResponse();
boolean success = indexShard.syncFlushIfNoPendingChanges(shardRequest.request.syncId(), commitId);
String message = success ? "synced flush succeeded" : "synced flush failed";
syncedFlushReplicaResponse.setResult(success, shardRequest.request.index(), shardRequest.shardId.id(), shardRequest.getNodeId(), message);
return syncedFlushReplicaResponse;
}
protected SyncedFlushResponse onAllReplicasResponded(SyncedFlushResponse finalResponse, CopyOnWriteArrayList<SyncedFlushReplicaResponse> replicaResponses) {
List<ActionWriteResponse.ShardInfo.Failure> additionalFailures = new ArrayList<>();
for (SyncedFlushReplicaResponse replicaResponse : replicaResponses) {
if (replicaResponse.succeeded == false) {
additionalFailures.add(new ActionWriteResponse.ShardInfo.Failure(replicaResponse.getIndex(), replicaResponse.getShardId(), replicaResponse.getNodeId(), replicaResponse.getReason(), RestStatus.CONFLICT, false));
}
}
additionalFailures.addAll(Arrays.asList(finalResponse.getShardInfo().getFailures()));
finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(finalResponse.getShardInfo().getTotal(), finalResponse.getShardInfo().getTotal() - additionalFailures.size(), additionalFailures.toArray(new ActionWriteResponse.ShardInfo.Failure[additionalFailures.size()])));
return finalResponse;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -30,7 +29,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Before;
@ -246,20 +244,4 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
fail("Calling with non-existing index should be ignored at the moment");
}
}
@Test
public void testAllActiveShardCopiesGrouped() {
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(0));
try {
this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, numberOfShards)).size();
fail();
} catch (ElasticsearchIllegalStateException e) {
}
initPrimaries();
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(0));
startInitializingShards(TEST_INDEX_1);
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(1));
startInitializingShards(TEST_INDEX_1);
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(numberOfReplicas + 1));
}
}

View File

@ -695,11 +695,14 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(commitID, equalTo(store.readLastCommittedSegmentsInfo().getId()));
byte[] fakeId = commitID.clone();
fakeId[0] = (byte) ~fakeId[0];
assertFalse("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", fakeId));
assertThat("should fail to sync flush with wrong id (but no docs)", engine.syncFlushIfNoPendingChanges(syncId + "1", fakeId),
equalTo(Engine.SyncedFlushResult.FAILED_COMMIT_MISMATCH));
engine.create(new Engine.Create(null, newUid("2"), doc));
assertFalse("should fail to sync flush with right id but pending doc", engine.syncFlushIfNoPendingChanges(syncId + "2", commitID));
assertThat("should fail to sync flush with right id but pending doc", engine.syncFlushIfNoPendingChanges(syncId + "2", commitID),
equalTo(Engine.SyncedFlushResult.FAILED_PENDING_OPERATIONS));
commitID = engine.flush();
assertTrue("should succeed to flush commit with right id and no pending doc", engine.syncFlushIfNoPendingChanges(syncId, commitID));
assertThat("should succeed to flush commit with right id and no pending doc", engine.syncFlushIfNoPendingChanges(syncId, commitID),
equalTo(Engine.SyncedFlushResult.SUCCESS));
assertThat(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
}

View File

@ -19,16 +19,14 @@
package org.elasticsearch.indices;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.syncedflush.SyncedFlushResponse;
import org.elasticsearch.indices.syncedflush.SyncedFlushService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.io.IOException;
@ -79,6 +77,7 @@ public class FlushTest extends ElasticsearchIntegrationTest {
}
}
@TestLogging("indices:TRACE")
public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).get();
@ -89,15 +88,16 @@ public class FlushTest extends ElasticsearchIntegrationTest {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
ClusterStateResponse state = client().admin().cluster().prepareState().get();
String nodeId = state.getState().getRoutingTable().index("test").shard(0).getShards().get(0).currentNodeId();
SyncedFlushResponse syncedFlushResponse = internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0));
assertTrue(syncedFlushResponse.success());
SyncedFlushService.SyncedFlushResult result = internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0));
assertTrue(result.success());
assertThat(result.totalShards(), equalTo(indexStats.getShards().length));
assertThat(result.successfulShards(), equalTo(indexStats.getShards().length));
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertThat(indexStats.getShards().length, equalTo(client().admin().indices().prepareGetIndex().get().getSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1) + 1));
String syncId = result.syncId();
for (ShardStats shardStats : indexStats.getShards()) {
assertThat(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncedFlushResponse.getSyncId()));
final String shardSyncId = shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID);
assertThat(shardSyncId, equalTo(syncId));
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class SynceFlushStreamablesTests extends ElasticsearchTestCase {
@Test
public void streamWriteSyncResponse() throws InterruptedException, IOException {
ShardId shardId = new ShardId("test", 0);
Map<String, byte[]> commitIds = new HashMap<>();
final String nodeId = "node_id";
commitIds.put(nodeId, generateRandomId(randomInt(100)));
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(shardId, randomAsciiOfLength(5), commitIds);
BytesStreamOutput out = new BytesStreamOutput();
syncedFlushRequest.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
SyncedFlushRequest request = new SyncedFlushRequest();
request.readFrom(in);
assertArrayEquals(request.commitIds().get(nodeId), syncedFlushRequest.commitIds().get(nodeId));
}
@Test
public void streamSyncResponse() throws InterruptedException, IOException {
ShardRouting shardRouting = new ImmutableShardRouting("test", 0, "test_node",
"other_test_node", randomBoolean(), ShardRoutingState.STARTED, randomInt());
AtomicReferenceArray atomicReferenceArray = new AtomicReferenceArray(1);
atomicReferenceArray.set(0, new PreSyncedShardFlushResponse(generateRandomId(randomInt(100)), shardRouting));
PreSyncedFlushResponse preSyncedFlushResponse = new PreSyncedFlushResponse(randomInt(), randomInt(), randomInt(), new ArrayList<ShardOperationFailedException>(), atomicReferenceArray);
BytesStreamOutput out = new BytesStreamOutput();
preSyncedFlushResponse.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
PreSyncedFlushResponse request = new PreSyncedFlushResponse();
request.readFrom(in);
assertArrayEquals(request.commitIds().get(shardRouting), preSyncedFlushResponse.commitIds().get(shardRouting));
}
@Test
public void streamShardSyncResponse() throws InterruptedException, IOException {
ShardRouting shardRouting = new ImmutableShardRouting("test", 0, "test_node",
"other_test_node", randomBoolean(), ShardRoutingState.STARTED, randomInt());
PreSyncedShardFlushResponse preSyncedShardFlushResponse = new PreSyncedShardFlushResponse(generateRandomId(randomInt(100)), shardRouting);
BytesStreamOutput out = new BytesStreamOutput();
preSyncedShardFlushResponse.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
PreSyncedShardFlushResponse request = new PreSyncedShardFlushResponse();
request.readFrom(in);
assertArrayEquals(request.id(), preSyncedShardFlushResponse.id());
}
byte[] generateRandomId(int length) {
byte[] id = new byte[length];
for (int i = 0; i < length; i++) {
id[i] = randomByte();
}
return id;
}
}