Add debug logging to help diagnose intermittent upgrade test failures.

Also fixed indentation in ElasticsearchMergePolicy.
This commit is contained in:
Ryan Ernst 2014-10-15 13:41:28 -07:00
parent 516b87e129
commit 5ed193086d
2 changed files with 46 additions and 31 deletions

View File

@ -31,6 +31,8 @@ import org.apache.lucene.util.packed.GrowableWriter;
import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
@ -53,6 +55,8 @@ import java.util.Map;
* be stored as payloads to numeric doc values. * be stored as payloads to numeric doc values.
*/ */
public final class ElasticsearchMergePolicy extends MergePolicy { public final class ElasticsearchMergePolicy extends MergePolicy {
private static ESLogger logger = Loggers.getLogger(ElasticsearchMergePolicy.class);
private final MergePolicy delegate; private final MergePolicy delegate;
private volatile boolean upgradeInProgress; private volatile boolean upgradeInProgress;
@ -167,7 +171,7 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
@Override @Override
public void add(OneMerge merge) { public void add(OneMerge merge) {
super.add(new IndexUpgraderOneMerge(merge.segments)); super.add(new IndexUpgraderOneMerge(merge.segments));
} }
@Override @Override
@ -183,7 +187,7 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
} }
MergeSpecification upgradedSpec = new IndexUpgraderMergeSpecification(); MergeSpecification upgradedSpec = new IndexUpgraderMergeSpecification();
for (OneMerge merge : spec.merges) { for (OneMerge merge : spec.merges) {
upgradedSpec.add(merge); upgradedSpec.add(merge);
} }
return upgradedSpec; return upgradedSpec;
} }
@ -191,7 +195,7 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
@Override @Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, public MergeSpecification findMerges(MergeTrigger mergeTrigger,
SegmentInfos segmentInfos, IndexWriter writer) throws IOException { SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
return upgradedMergeSpecification(delegate.findMerges(mergeTrigger, segmentInfos, writer)); return upgradedMergeSpecification(delegate.findMerges(mergeTrigger, segmentInfos, writer));
} }
@Override @Override
@ -199,43 +203,46 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
throws IOException { throws IOException {
if (upgradeInProgress) { if (upgradeInProgress) {
MergeSpecification spec = new IndexUpgraderMergeSpecification(); MergeSpecification spec = new IndexUpgraderMergeSpecification();
for (SegmentCommitInfo info : segmentInfos) { for (SegmentCommitInfo info : segmentInfos) {
org.apache.lucene.util.Version old = info.info.getVersion(); org.apache.lucene.util.Version old = info.info.getVersion();
org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion; org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion;
if (cur.major > old.major || if (cur.major > old.major ||
cur.major == old.major && cur.minor > old.minor) { cur.major == old.major && cur.minor > old.minor) {
// TODO: Use IndexUpgradeMergePolicy instead. We should be comparing codecs, // TODO: Use IndexUpgradeMergePolicy instead. We should be comparing codecs,
// for now we just assume every minor upgrade has a new format. // for now we just assume every minor upgrade has a new format.
spec.add(new OneMerge(Lists.newArrayList(info))); logger.debug("Adding segment " + info.info.name + " to be upgraded");
} spec.add(new OneMerge(Lists.newArrayList(info)));
if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) { }
// hit our max upgrades, so return the spec. we will get a cascaded call to continue. if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) {
return spec; // hit our max upgrades, so return the spec. we will get a cascaded call to continue.
} logger.debug("Returning " + spec.merges.size() + " merges for upgrade");
} return spec;
// We must have less than our max upgrade merges, so the next return will be our last in upgrading mode. }
upgradeInProgress = false; }
if (spec.merges.isEmpty() == false) { // We must have less than our max upgrade merges, so the next return will be our last in upgrading mode.
return spec; upgradeInProgress = false;
} if (spec.merges.isEmpty() == false) {
// fall through, so when we don't have any segments to upgrade, the delegate policy logger.debug("Return " + spec.merges.size() + " merges for end of upgrade");
// has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount) return spec;
} }
// fall through, so when we don't have any segments to upgrade, the delegate policy
// has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount)
}
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer)); return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
} }
@Override @Override
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer) public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
throws IOException { throws IOException {
return upgradedMergeSpecification(delegate.findForcedDeletesMerges(segmentInfos, writer)); return upgradedMergeSpecification(delegate.findForcedDeletesMerges(segmentInfos, writer));
} }
@Override @Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, IndexWriter writer) throws IOException { public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {
return delegate.useCompoundFile(segments, newSegment, writer); return delegate.useCompoundFile(segments, newSegment, writer);
} }
/** /**
@ -250,7 +257,7 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "(" + delegate + ")"; return getClass().getSimpleName() + "(" + delegate + ")";
} }
} }

View File

@ -121,6 +121,8 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
assertNotUpgraded(httpClient, null); assertNotUpgraded(httpClient, null);
final String indexToUpgrade = "test" + randomInt(numIndexes - 1); final String indexToUpgrade = "test" + randomInt(numIndexes - 1);
logger.debug("Running upgrade on index " + indexToUpgrade);
logClusterState();
runUpgrade(httpClient, indexToUpgrade); runUpgrade(httpClient, indexToUpgrade);
awaitBusy(new Predicate<Object>() { awaitBusy(new Predicate<Object>() {
@Override @Override
@ -132,8 +134,14 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
} }
} }
}); });
logger.debug("Single index upgrade complete");
logClusterState();
logger.debug("Running upgrade on the rest of the indexes");
logClusterState();
runUpgrade(httpClient, null, "wait_for_completion", "true"); runUpgrade(httpClient, null, "wait_for_completion", "true");
logger.debug("Full upgrade complete");
logClusterState();
assertUpgraded(httpClient, null); assertUpgraded(httpClient, null);
} }