From 7101ecea94757463a66b38d1cbb006a1beff37f1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 14 Feb 2016 09:07:02 -0500 Subject: [PATCH] Inline ReplicationPhase# Relates #16725 --- .../TransportReplicationAction.java | 95 ++++++++++--------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 894b19fedb7..7fc18266816 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -83,6 +83,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -874,23 +875,48 @@ public abstract class TransportReplicationAction shards = shards(shardRoutingTable); + boolean executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings()); + DiscoveryNodes nodes = state.getNodes(); if (shards.isEmpty()) { logger.debug("replication phase for request [{}] on [{}] is skipped due to index deletion after primary operation", replicaRequest, shardId); } // we calculate number of target nodes to send replication operations, including nodes with relocating shards + AtomicInteger numberOfPendingShardInstances = new AtomicInteger(); + this.totalShards = countTotalAndPending(shards, executeOnReplica, nodes, numberOfPendingShardInstances); + this.pending = numberOfPendingShardInstances; + this.shards = shards; + this.executeOnReplica = executeOnReplica; + this.nodes = nodes; + if (logger.isTraceEnabled()) { + logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(), + transportReplicaAction, replicaRequest, state.version()); + } + } + + private int countTotalAndPending(List shards, boolean executeOnReplica, DiscoveryNodes nodes, AtomicInteger pending) { + assert pending.get() == 0; + int numberOfIgnoredShardInstances = performOnShards(shards, executeOnReplica, nodes, shard -> pending.incrementAndGet(), shard -> pending.incrementAndGet()); + // one for the local primary copy + return 1 + numberOfIgnoredShardInstances + pending.get(); + } + + private int performOnShards(List shards, boolean executeOnReplica, DiscoveryNodes nodes, Consumer onLocalShard, Consumer onRelocatingShard) { int numberOfIgnoredShardInstances = 0; - int numberOfPendingShardInstances = 0; for (ShardRouting shard : shards) { - // the following logic to select the shards to replicate to is mirrored and explained in the doRun method below if (shard.primary() == false && executeOnReplica == false) { + // If the replicas use shadow replicas, there is no reason to + // perform the action on the replica, so skip it and + // immediately return + + // this delays mapping updates on replicas because they have + // to wait until they get the new mapping through the cluster + // state, which is why we recommend pre-defined mappings for + // indices using shadow replicas numberOfIgnoredShardInstances++; continue; } @@ -898,20 +924,26 @@ public abstract class TransportReplicationAction shards(IndexShardRoutingTable shardRoutingTable) { + return (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList(); } /** @@ -951,36 +983,7 @@ public abstract class TransportReplicationAction performOnReplica(shard), shard -> performOnReplica(shard.buildTargetRelocatingShard())); } /**