mirror of https://github.com/apache/lucene.git
SOLR-9835: TestInjection.waitForInSyncWithLeader() should rely on commit point of searcher
This commit is contained in:
parent
013601f053
commit
d156aafd1d
|
@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.cloud.ZkController;
|
import org.apache.solr.cloud.ZkController;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.solr.common.util.SuppressForbidden;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.handler.ReplicationHandler;
|
import org.apache.solr.handler.ReplicationHandler;
|
||||||
|
import org.apache.solr.search.SolrIndexSearcher;
|
||||||
import org.apache.solr.update.SolrIndexWriter;
|
import org.apache.solr.update.SolrIndexWriter;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -363,7 +365,8 @@ public class TestInjection {
|
||||||
@SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
|
@SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
|
||||||
public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException {
|
public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException {
|
||||||
if (waitForReplicasInSync == null) return true;
|
if (waitForReplicasInSync == null) return true;
|
||||||
|
log.info("Start waiting for replica in sync with leader");
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
|
Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
|
||||||
boolean enabled = pair.first();
|
boolean enabled = pair.first();
|
||||||
if (!enabled) return true;
|
if (!enabled) return true;
|
||||||
|
@ -380,14 +383,20 @@ public class TestInjection {
|
||||||
|
|
||||||
NamedList<Object> response = leaderClient.request(new QueryRequest(params));
|
NamedList<Object> response = leaderClient.request(new QueryRequest(params));
|
||||||
long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
|
long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
|
||||||
|
RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
|
||||||
String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
|
try {
|
||||||
if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
|
String localVersion = searcher.get().getIndexReader().getIndexCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
|
||||||
if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
|
if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
|
||||||
return true;
|
if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
|
||||||
} else {
|
log.info("Waiting time for replica in sync with leader: {}", System.currentTimeMillis()-currentTime);
|
||||||
Thread.sleep(500);
|
return true;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
searcher.decref();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue