Enable trace log in FollowerFailOverIT ()

This suite still fails one per week sometimes with a worrying assertion.
Sadly we are still unable to find the actual source.

Expected: <SeqNoStats{maxSeqNo=229, localCheckpoint=86, globalCheckpoint=86}>
but: was   <SeqNoStats{maxSeqNo=229, localCheckpoint=-1, globalCheckpoint=86}>

This change enables trace log in the suite so we will have a better
picture if this fails again.

Relates 
This commit is contained in:
Nhat Nguyen 2019-02-01 15:44:39 -05:00 committed by GitHub
parent 03a1d21070
commit 3ecdfe1060
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 7 deletions
x-pack/plugin/ccr/src
main/java/org/elasticsearch/xpack/ccr
test/java/org/elasticsearch/xpack

@ -63,6 +63,9 @@ public class TransportBulkShardOperationsAction
@Override
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry());
}
return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(),
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
}
@ -134,6 +137,10 @@ public class TransportBulkShardOperationsAction
// replicated to replicas but with the existing primary term (not the current primary term) in order
// to guarantee the consistency between the primary and replicas, and between translog and Lucene index.
final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
if (logger.isTraceEnabled()) {
logger.trace("operation [{}] was processed before on following primary shard {} with existing term {}",
targetOp, primary.routingEntry(), failure.getExistingPrimaryTerm());
}
assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo();
if (failure.getExistingPrimaryTerm().isPresent()) {
appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong()));
@ -156,6 +163,9 @@ public class TransportBulkShardOperationsAction
@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry());
}
return shardOperationOnReplica(request, replica, logger);
}

@ -72,6 +72,9 @@ public final class FollowingEngine extends InternalEngine {
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (hasBeenProcessedBefore(index)) {
if (logger.isTraceEnabled()) {
logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin());
}
if (index.origin() == Operation.Origin.PRIMARY) {
/*
* The existing operation in this engine was probably assigned the term of the previous primary shard which is different

@ -444,6 +444,13 @@ public abstract class CcrIntegTestCase extends ESTestCase {
* on the follower equal the leader's; then verifies the existing pairs of (docId, seqNo) on the follower also equal the leader.
*/
protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String followerIndex) throws Exception {
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
logger.info("--> docs on the follower {}", docsOnFollower);
assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex)));
}, 120, TimeUnit.SECONDS);
logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
Map<Integer, SeqNoStats> leaderStats = new HashMap<>();
@ -460,13 +467,8 @@ public abstract class CcrIntegTestCase extends ESTestCase {
}
followerStats.put(shardStat.getShardRouting().shardId().id(), shardStat.getSeqNoStats());
}
assertThat(leaderStats, equalTo(followerStats));
}, 60, TimeUnit.SECONDS);
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
assertThat(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex),
equalTo(getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex)));
}, 60, TimeUnit.SECONDS);
assertThat(followerStats, equalTo(leaderStats));
}, 120, TimeUnit.SECONDS);
}
private Map<Integer, List<DocIdSeqNoAndTerm>> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException {

@ -27,6 +27,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@ -44,6 +45,7 @@ import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG")
public class FollowerFailOverIT extends CcrIntegTestCase {
@Override