mirror of
synced 2025-03-09 14:34:43 +00:00
Improve trace logging in TransportReplicationAction and error reporting at RecoveryWhileUnderLoadIT
Things that helped me traced down an issue. Closes #14931
This commit is contained in:
@ -275,7 +275,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica", t);
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, actionName, request);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
public void onNewClusterState(ClusterState state) {
@ -294,7 +294,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} else {
try {
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t);
failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t, request);
} catch (Throwable unexpected) {
logger.error("{} unexpected error while failing replica", request.internalShardId.id(), unexpected);
} finally {
@ -378,7 +378,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry.", primary.shardId());
logger.trace("primary shard [{}] is not yet active, scheduling a retry. action [{}], request [{}]", primary.shardId(), actionName, internalRequest.request);
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
@ -540,7 +540,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
void finishAsFailed(Throwable failure) {
if (finished.compareAndSet(false, true)) {
logger.trace("operation failed", failure);
logger.trace("operation failed. action [{}], request [{}]", failure, actionName, internalRequest.request);
} else {
assert false : "finishAsFailed called but operation is already finished";
@ -548,7 +548,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
void finishWithUnexpectedFailure(Throwable failure) {
logger.warn("unexpected error during the primary phase for action [{}]", failure, actionName);
logger.warn("unexpected error during the primary phase for action [{}], request [{}]", failure, actionName, internalRequest.request);
if (finished.compareAndSet(false, true)) {
@ -559,7 +559,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
void finishOnRemoteSuccess(Response response) {
if (finished.compareAndSet(false, true)) {
logger.trace("operation succeeded");
if (logger.isTraceEnabled()) {
logger.trace("operation succeeded. action [{}],request [{}]", actionName, internalRequest.request);
} else {
assert false : "finishOnRemoteSuccess called but operation is already finished";
@ -580,12 +582,14 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
indexShardReference = getIndexShardOperationsCounter(primary.shardId());
PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
logger.trace("operation completed on primary [{}]", primary);
if (logger.isTraceEnabled()) {
logger.trace("operation completed on primary [{}], action [{}], request [{}], cluster state version [{}]", primary, actionName, por.request, observer.observedState().version());
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference, shardFailedTimeout);
} catch (Throwable e) {
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
logger.trace("had an error while performing operation on primary ({}, action [{}], request [{}]), scheduling a retry.", e, primary, actionName, internalRequest.request);
// We have to close here because when we retry we will increment get a new reference on index shard again and we do not want to
// increment twice.
@ -650,8 +654,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (sizeActive < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, sizeActive, requiredNumber);
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry. action [{}], request [{}]",
shard.shardId(), consistencyLevel, sizeActive, requiredNumber, actionName, internalRequest.request);
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ").";
} else {
return null;
@ -670,8 +674,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return new IndexShardReference(indexShard);
private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
logger.trace("failure on replica [{}][{}]", t, index, shardId);
private void failReplicaIfNeeded(String index, int shardId, Throwable t, ReplicaRequest request) {
logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
if (ignoreReplicaException(t) == false) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
@ -833,6 +837,10 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected void doRun() {
if (logger.isTraceEnabled()) {
logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
actionName, replicaRequest, observer.observedState().version());
if (pending.get() == 0) {
@ -889,7 +897,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void handleException(TransportException exp) {
logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest);
logger.trace("[{}] transport failure during replica request [{}], action [{}]", exp, node, replicaRequest, actionName);
if (ignoreReplicaException(exp)) {
onReplicaFailure(nodeId, exp);
} else {
@ -908,7 +916,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
failReplicaIfNeeded(shard.index(), shard.id(), e, replicaRequest);
@ -924,7 +932,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
failReplicaIfNeeded(shard.index(), shard.id(), e, replicaRequest);
onReplicaFailure(nodeId, e);
@ -23,13 +23,17 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
@ -40,10 +44,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadIT.class);
@ -228,7 +229,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
int allowNodes = 2;
assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas).put(TranslogConfig.INDEX_TRANSLOG_DURABILITY, Translog.Durabilty.ASYNC)));
final int numDocs = scaledRandomIntBetween(200, 20000);
final int numDocs = scaledRandomIntBetween(200, 9999);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), numDocs)) {
@ -262,12 +263,14 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
private void iterateAssertCount(final int numberOfShards, final long numberOfDocs, final int iterations) throws Exception {
SearchResponse[] iterationResults = new SearchResponse[iterations];
boolean error = false;
SearchResponse lastErroneousResponse = null;
for (int i = 0; i < iterations; i++) {
SearchResponse searchResponse = client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get();
SearchResponse searchResponse = client().prepareSearch().setSize((int) numberOfDocs).setQuery(matchAllQuery()).addSort("id", SortOrder.ASC).get();
logSearchResponse(numberOfShards, numberOfDocs, i, searchResponse);
iterationResults[i] = searchResponse;
if (searchResponse.getHits().totalHits() != numberOfDocs) {
error = true;
lastErroneousResponse = searchResponse;
@ -279,6 +282,15 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
logger.info("shard [{}] - count {}, primary {}", shardStats.getShardRouting().id(), docsStats.getCount(), shardStats.getShardRouting().primary());
for (int doc = 1, hit = 0; hit < lastErroneousResponse.getHits().getHits().length; hit++, doc++) {
SearchHit searchHit = lastErroneousResponse.getHits().getAt(hit);
while (doc < Integer.parseInt(searchHit.id())) {
logger.info("missing doc [{}], indexed to shard [{}]", doc, MathUtils.mod(Murmur3HashFunction.hash(Integer.toString(doc)), numberOfShards));
//if there was an error we try to wait and see if at some point it'll get fixed
logger.info("--> trying to wait");
assertTrue(awaitBusy(() -> {
@ -195,6 +195,7 @@ public class BackgroundIndexer implements AutoCloseable {
XContentBuilder builder = XContentFactory.smileBuilder();
builder.startObject().field("test", "value" + id)
.field("text", text.toString())
.field("id", id)
return builder;
Reference in New Issue
Block a user