Add trace logging when aquiring and releasing operation locks for replication requests

This commit is contained in:
Areek Zillur 2016-11-10 15:13:42 -05:00
parent 0a06a0c2b3
commit 4e996ca9f5
4 changed files with 36 additions and 7 deletions

View File

@ -112,13 +112,12 @@ public class ReplicationOperation<
totalShards.incrementAndGet();
pendingActions.incrementAndGet();
primaryResult = primary.perform(request);
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.

View File

@ -366,6 +366,14 @@ public abstract class TransportReplicationAction<
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
);
}
@Override // used for trace logging when acquiring or releasing operation lock
public String toString() {
return "AsyncPrimaryAction" +
"[request=" + request + "]"
+ "[targetAllocationId=" + targetAllocationID + "]"
+ "[taskId=" + replicationTask.getId() + "]";
}
}
protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
@ -560,6 +568,14 @@ public abstract class TransportReplicationAction<
replica.acquireReplicaOperationLock(request.primaryTerm, this, executor);
}
@Override // used for trace logging when acquiring or releasing operation lock
public String toString() {
return "AsyncReplicaAction"
+ "[request=" + request + "]"
+ "[targetAllocationId=" + targetAllocationID + "]"
+ "[taskId=" + task.getId() + "]";
}
/**
* Listens for the response on the replica and sends the response back to the primary.
*/
@ -881,6 +897,11 @@ public abstract class TransportReplicationAction<
public void onFailure(Exception e) {
onReferenceAcquired.onFailure(e);
}
@Override
public String toString() {
return onReferenceAcquired.toString();
}
};
indexShard.acquirePrimaryOperationLock(onAcquired, executor);

View File

@ -53,6 +53,9 @@ public class IndexShardOperationsLock implements Closeable {
@Override
public void close() {
closed = true;
if (logger.isTraceEnabled()) {
logger.trace("operation lock on [{}] closed", shardId);
}
}
/**
@ -120,7 +123,7 @@ public class IndexShardOperationsLock implements Closeable {
Releasable releasable;
try {
synchronized (this) {
releasable = tryAcquire();
releasable = tryAcquire(onAcquired.toString());
if (releasable == null) {
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
if (delayedOperations == null) {
@ -142,12 +145,18 @@ public class IndexShardOperationsLock implements Closeable {
onAcquired.onResponse(releasable);
}
@Nullable private Releasable tryAcquire() throws InterruptedException {
@Nullable private Releasable tryAcquire(String resource) throws InterruptedException {
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
AtomicBoolean closed = new AtomicBoolean();
if (logger.isTraceEnabled()) {
logger.trace("acquired operation lock on [{}] for resource [{}]", shardId, resource);
}
return () -> {
if (closed.compareAndSet(false, true)) {
semaphore.release(1);
if (logger.isTraceEnabled()) {
logger.trace("released operation lock on [{}] for resource [{}]", shardId, resource);
}
}
};
}

View File

@ -485,7 +485,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
*/
@TestLogging("_root:DEBUG,org.elasticsearch.action.index:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE,org.elasticsearch.cluster.service:TRACE,"
+ "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE")
+ "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE")
public void testAckedIndexing() throws Exception {
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;