diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 570307a717d..7911b4e1e36 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -54,18 +54,16 @@ public class TransportShardFlushAction extends TransportReplicationAction onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception { - ShardId shardId = request.shardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData(); + protected WriteResult onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception { + final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); long[] preVersions = new long[request.items().length]; VersionType[] preVersionTypes = new VersionType[request.items().length]; Translog.Location location = null; for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { BulkItemRequest item = request.items()[requestIndex]; - location = handleItem(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item); + location = handleItem(metaData, request, primary, preVersions, preVersionTypes, location, requestIndex, item); } BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 9587b4e6b2c..740a003ffa8 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -93,16 +93,16 @@ public abstract class TransportReplicationAction< Response extends ReplicationResponse > extends TransportAction { - protected final TransportService transportService; + private final TransportService transportService; protected final ClusterService clusterService; - protected final IndicesService indicesService; + private final IndicesService indicesService; private final ShardStateAction shardStateAction; private final TransportRequestOptions transportOptions; private final String executor; // package private for testing - final String transportReplicaAction; - final String transportPrimaryAction; + private final String transportReplicaAction; + private final String transportPrimaryAction; private final ReplicasProxy replicasProxy; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, @@ -167,14 +167,18 @@ public abstract class TransportReplicationAction< * Primary operation on node with primary copy. * * @param shardRequest the request to the primary shard + * @param primary the primary shard to perform the operation on */ - protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest) throws Exception; + protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception; /** * Synchronous replica operation on nodes with replica copies. This is done under the lock form - * {@link #acquireReplicaOperationLock(ShardId, long, String, ActionListener)}. + * {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)} + * + * @param shardRequest the request to the replica shard + * @param replica the replica shard to perform the operation on */ - protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest); + protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica); /** * Cluster level block to check before request execution @@ -436,6 +440,7 @@ public abstract class TransportReplicationAction< // allocation id of the replica this request is meant for private final String targetAllocationID; private final TransportChannel channel; + private final IndexShard replica; /** * The task on the node with the replica shard. */ @@ -449,12 +454,15 @@ public abstract class TransportReplicationAction< this.channel = channel; this.task = task; this.targetAllocationID = targetAllocationID; + final ShardId shardId = request.shardId(); + assert shardId != null : "request shardId must be set"; + this.replica = getIndexShard(shardId); } @Override public void onResponse(Releasable releasable) { try { - ReplicaResult replicaResult = shardOperationOnReplica(request); + ReplicaResult replicaResult = shardOperationOnReplica(request, replica); releasable.close(); // release shard operation lock before responding to caller replicaResult.respond(new ResponseListener()); } catch (Exception e) { @@ -521,8 +529,12 @@ public abstract class TransportReplicationAction< @Override protected void doRun() throws Exception { setPhase(task, "replica"); - assert request.shardId() != null : "request shardId must be set"; - acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), targetAllocationID, this); + final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); + if (actualAllocationId.equals(targetAllocationID) == false) { + throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, + actualAllocationId); + } + replica.acquireReplicaOperationLock(request.primaryTerm, this, executor); } /** @@ -550,6 +562,11 @@ public abstract class TransportReplicationAction< } } + private IndexShard getIndexShard(ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getShard(shardId.id()); + } + /** * Responsible for routing and retrying failed operations on the primary. * The actual primary operation is done in {@link ReplicationOperation} on the @@ -816,10 +833,9 @@ public abstract class TransportReplicationAction< * tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}). */ - protected void acquirePrimaryShardReference(ShardId shardId, String allocationId, - ActionListener onReferenceAcquired) { - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); + private void acquirePrimaryShardReference(ShardId shardId, String allocationId, + ActionListener onReferenceAcquired) { + IndexShard indexShard = getIndexShard(shardId); // we may end up here if the cluster state used to route the primary is so stale that the underlying // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails // the replica will take over and a replica will be assigned to the first node. @@ -847,20 +863,6 @@ public abstract class TransportReplicationAction< indexShard.acquirePrimaryOperationLock(onAcquired, executor); } - /** - * tries to acquire an operation on replicas. The lock is closed as soon as replication is completed on the node. - */ - protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, final String allocationId, - ActionListener onLockAcquired) { - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - IndexShard indexShard = indexService.getShard(shardId.id()); - final String actualAllocationId = indexShard.routingEntry().allocationId().getId(); - if (actualAllocationId.equals(allocationId) == false) { - throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId); - } - indexShard.acquireReplicaOperationLock(primaryTerm, onLockAcquired, executor); - } - /** * Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase * will be skipped. For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do. @@ -899,7 +901,7 @@ public abstract class TransportReplicationAction< @Override public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request); + PrimaryResult result = shardOperationOnPrimary(request, indexShard); result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); return result; } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index bf2b3235b11..05695b246ef 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -29,9 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.indices.IndicesService; @@ -73,26 +71,15 @@ public abstract class TransportWriteAction< protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard); @Override - protected final WritePrimaryResult shardOperationOnPrimary(Request request) throws Exception { - IndexShard indexShard = indexShard(request); - WriteResult result = onPrimaryShard(request, indexShard); - return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), indexShard); + protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception { + WriteResult result = onPrimaryShard(request, primary); + return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), primary); } @Override - protected final WriteReplicaResult shardOperationOnReplica(Request request) { - IndexShard indexShard = indexShard(request); - Translog.Location location = onReplicaShard(request, indexShard); - return new WriteReplicaResult(indexShard, request, location); - } - - /** - * Fetch the IndexShard for the request. Protected so it can be mocked in tests. - */ - protected IndexShard indexShard(Request request) { - final ShardId shardId = request.shardId(); - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return indexService.getShard(shardId.id()); + protected final WriteReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { + Translog.Location location = onReplicaShard(request, replica); + return new WriteReplicaResult(replica, request, location); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregatorFactory.java index 8e1c0c51095..783c3baa7a4 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregatorFactory.java @@ -94,11 +94,9 @@ public class TopHitsAggregatorFactory extends AggregatorFactory void noRefreshCall(ThrowingBiFunction action, + private void noRefreshCall(ThrowingTriFunction action, BiConsumer> responder) throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit - Result result = action.apply(new TestAction(), request); + Result result = action.apply(new TestAction(), request, indexShard); CapturingActionListener listener = new CapturingActionListener<>(); responder.accept(result, listener); assertNotNull(listener.response); @@ -83,12 +83,12 @@ public class TransportWriteActionTests extends ESTestCase { immediateRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, r -> {}); } - private void immediateRefresh(ThrowingBiFunction action, + private void immediateRefresh(ThrowingTriFunction action, BiConsumer> responder, Consumer responseChecker) throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); - Result result = action.apply(new TestAction(), request); + Result result = action.apply(new TestAction(), request, indexShard); CapturingActionListener listener = new CapturingActionListener<>(); responder.accept(result, listener); assertNotNull(listener.response); @@ -106,12 +106,12 @@ public class TransportWriteActionTests extends ESTestCase { waitForRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, (r, forcedRefresh) -> {}); } - private void waitForRefresh(ThrowingBiFunction action, + private void waitForRefresh(ThrowingTriFunction action, BiConsumer> responder, BiConsumer resultChecker) throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - Result result = action.apply(new TestAction(), request); + Result result = action.apply(new TestAction(), request, indexShard); CapturingActionListener listener = new CapturingActionListener<>(); responder.accept(result, listener); assertNull(listener.response); // Haven't reallresponded yet @@ -135,11 +135,6 @@ public class TransportWriteActionTests extends ESTestCase { new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME); } - @Override - protected IndexShard indexShard(TestRequest request) { - return indexShard; - } - @Override protected WriteResult onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception { return new WriteResult<>(new TestResponse(), location); @@ -185,7 +180,7 @@ public class TransportWriteActionTests extends ESTestCase { } } - private interface ThrowingBiFunction { - R apply(A a, B b) throws Exception; + private interface ThrowingTriFunction { + R apply(A a, B b, C c) throws Exception; } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java index 944ddb9b05f..6985d2dcf17 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java @@ -39,7 +39,9 @@ public class LongGCDisruption extends SingleNodeDisruption { private static final Pattern[] unsafeClasses = new Pattern[]{ // logging has shared JVM locks - we may suspend a thread and block other nodes from doing their thing - Pattern.compile("logging\\.log4j") + Pattern.compile("logging\\.log4j"), + // security manager is shared across all nodes AND it uses synced hashmaps interanlly + Pattern.compile("java\\.lang\\.SecurityManager") }; protected final String disruptedNode;