better open reference reporting

This commit is contained in:
Boaz Leskes 2016-02-13 14:42:53 +01:00
parent 63ada9882e
commit 25fae037ac
3 changed files with 54 additions and 12 deletions

View File

@ -80,6 +80,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -402,7 +403,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request)) {
shardOperationOnReplica(request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
@ -675,7 +676,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return;
}
// closed in finishAsFailed(e) in the case of error
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
indexShardReference = getIndexShardReferenceOnPrimary(shardId, request);
if (indexShardReference.isRelocated() == false) {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
@ -780,24 +781,64 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
}
static ConcurrentMap<IndexShardReference, String> openShardReferences;
static boolean setupShardReferenceAssertions() {
openShardReferences = new ConcurrentHashMap<>();
return true;
}
static boolean addShardReference(IndexShardReference ref, String desc) {
String prev = openShardReferences.put(ref, desc);
if (prev != null) {
throw new AssertionError("shard ref " + ref + " is added twice. current [" + desc + "] prev [" + prev + "]");
}
return true;
}
static boolean removeShardReference(IndexShardReference ref) {
assert openShardReferences.remove(ref) != null : "failed to find ref [" + ref + "]";
return true;
}
static {
assert setupShardReferenceAssertions();
}
static public void assertAllShardReferencesAreCleaned() {
if (openShardReferences == null || openShardReferences.isEmpty()) {
return;
}
StringBuilder sb = new StringBuilder();
for (String desc : openShardReferences.values()) {
sb.append(desc).append("\n");
}
assert sb.length() == 0 : "Found unclosed shard references:\n" + sb;
}
/**
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}).
*/
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, true);
IndexShardReference ref = new IndexShardReferenceImpl(indexShard, true);
assert addShardReference(ref, "primary: " + request.toString());
return ref;
}
/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, ReplicaRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, false);
IndexShardReference ref = new IndexShardReferenceImpl(indexShard, false);
assert addShardReference(ref, "replica: " + request.toString());
return ref;
}
/**
@ -995,7 +1036,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
indexShardReference.failShard(message, shardFailedError);
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert shardFailedError.getMessage().contains("TransportService is closed ") :
assert shardFailedError.getMessage().contains("TransportService is closed") :
shardFailedError;
onReplicaFailure(nodeId, exp);
}
@ -1105,6 +1146,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
public void close() {
operationLock.close();
assert removeShardReference(this);
}
@Override

View File

@ -18,8 +18,6 @@
*/
package org.elasticsearch.action.support.replication;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ReplicationResponse;
@ -1098,11 +1096,11 @@ public class TransportReplicationActionTests extends ESTestCase {
}
@Override
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
return getOrCreateIndexShardOperationsCounter();
}
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, Request request) {
return getOrCreateIndexShardOperationsCounter();
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@ -141,7 +142,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
@Override
protected void beforeIndexDeletion() {
try {
// some test may leave opeations in flight. Wait for them to be finnished
// some test may leave operations in flight. Wait for them to be finished
assertBusy(() -> TransportReplicationAction.assertAllShardReferencesAreCleaned());
assertBusy(() -> super.beforeIndexDeletion());
} catch (Exception e) {
throw new AssertionError(e);