From 25fae037ac5cb7515ebdffe7a1a35651bc6c7c9e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 13 Feb 2016 14:42:53 +0100 Subject: [PATCH] better open reference reporting --- .../TransportReplicationAction.java | 56 ++++++++++++++++--- .../TransportReplicationActionTests.java | 6 +- .../DiscoveryWithServiceDisruptionsIT.java | 4 +- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 115c5636c06..e75be6ebce2 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -80,6 +80,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -402,7 +403,7 @@ public abstract class TransportReplicationAction primaryResponse = shardOperationOnPrimary(state.metaData(), request); @@ -780,24 +781,64 @@ public abstract class TransportReplicationAction openShardReferences; + + static boolean setupShardReferenceAssertions() { + openShardReferences = new ConcurrentHashMap<>(); + return true; + } + + static boolean addShardReference(IndexShardReference ref, String desc) { + String prev = openShardReferences.put(ref, desc); + if (prev != null) { + throw new AssertionError("shard ref " + ref + " is added twice. current [" + desc + "] prev [" + prev + "]"); + } + return true; + } + + static boolean removeShardReference(IndexShardReference ref) { + assert openShardReferences.remove(ref) != null : "failed to find ref [" + ref + "]"; + return true; + } + + static { + assert setupShardReferenceAssertions(); + } + + static public void assertAllShardReferencesAreCleaned() { + if (openShardReferences == null || openShardReferences.isEmpty()) { + return; + } + StringBuilder sb = new StringBuilder(); + for (String desc : openShardReferences.values()) { + sb.append(desc).append("\n"); + } + assert sb.length() == 0 : "Found unclosed shard references:\n" + sb; + } + /** * returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}). */ - protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - return new IndexShardReferenceImpl(indexShard, true); + IndexShardReference ref = new IndexShardReferenceImpl(indexShard, true); + assert addShardReference(ref, "primary: " + request.toString()); + return ref; } /** * returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as * replication is completed on the node. */ - protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, ReplicaRequest request) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - return new IndexShardReferenceImpl(indexShard, false); + IndexShardReference ref = new IndexShardReferenceImpl(indexShard, false); + assert addShardReference(ref, "replica: " + request.toString()); + return ref; } /** @@ -995,7 +1036,7 @@ public abstract class TransportReplicationAction TransportReplicationAction.assertAllShardReferencesAreCleaned()); assertBusy(() -> super.beforeIndexDeletion()); } catch (Exception e) { throw new AssertionError(e);