mirror of https://github.com/apache/lucene.git
SOLR-7625: Ensure that the max value for seeding version buckets is updated after recovery even if the UpdateLog is not replayed.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1683174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ddeb05ae98
commit
078ac707ae
|
@ -386,6 +386,9 @@ Bug Fixes
|
||||||
during a commit cannot be upgraded to a write-lock needed to block updates; solution is to move the
|
during a commit cannot be upgraded to a write-lock needed to block updates; solution is to move the
|
||||||
call out of the firstSearcher event path and into the SolrCore constructor. (Timothy Potter)
|
call out of the firstSearcher event path and into the SolrCore constructor. (Timothy Potter)
|
||||||
|
|
||||||
|
* SOLR-7625: Ensure that the max value for seeding version buckets is updated after recovery even if
|
||||||
|
the UpdateLog is not replayed. (Timothy Potter)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -313,6 +313,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<RecoveryInfo> replayFuture = null;
|
||||||
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
|
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
|
||||||
try {
|
try {
|
||||||
CloudDescriptor cloudDesc = core.getCoreDescriptor()
|
CloudDescriptor cloudDesc = core.getCoreDescriptor()
|
||||||
|
@ -430,7 +431,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||||
log.info("Begin buffering updates.");
|
log.info("Begin buffering updates.");
|
||||||
ulog.bufferUpdates();
|
ulog.bufferUpdates();
|
||||||
replayed = false;
|
replayed = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
replicate(zkController.getNodeName(), core, leaderprops);
|
replicate(zkController.getNodeName(), core, leaderprops);
|
||||||
|
@ -439,8 +440,8 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||||
log.info("Recovery was cancelled");
|
log.info("Recovery was cancelled");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
replay(core);
|
replayFuture = replay(core);
|
||||||
replayed = true;
|
replayed = true;
|
||||||
|
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
|
@ -517,6 +518,14 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if replay was skipped (possibly to due pulling a full index from the leader),
|
||||||
|
// then we still need to update version bucket seeds after recovery
|
||||||
|
if (successfulRecovery && replayFuture == null) {
|
||||||
|
log.info("Updating version bucket highest from index after successful recovery.");
|
||||||
|
core.seedVersionBuckets();
|
||||||
|
}
|
||||||
|
|
||||||
log.info("Finished recovery process.");
|
log.info("Finished recovery process.");
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -545,8 +545,6 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
|
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -842,7 +840,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// seed version buckets with max from index during core initialization ... requires a searcher!
|
// seed version buckets with max from index during core initialization ... requires a searcher!
|
||||||
seedVersionBucketsWithMaxFromIndex();
|
seedVersionBuckets();
|
||||||
|
|
||||||
bufferUpdatesIfConstructing(coreDescriptor);
|
bufferUpdatesIfConstructing(coreDescriptor);
|
||||||
|
|
||||||
|
@ -854,13 +852,13 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
registerConfListener();
|
registerConfListener();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void seedVersionBucketsWithMaxFromIndex() {
|
public void seedVersionBuckets() {
|
||||||
UpdateHandler uh = getUpdateHandler();
|
UpdateHandler uh = getUpdateHandler();
|
||||||
if (uh != null && uh.getUpdateLog() != null) {
|
if (uh != null && uh.getUpdateLog() != null) {
|
||||||
RefCounted<SolrIndexSearcher> newestSearcher = getRealtimeSearcher();
|
RefCounted<SolrIndexSearcher> newestSearcher = getRealtimeSearcher();
|
||||||
if (newestSearcher != null) {
|
if (newestSearcher != null) {
|
||||||
try {
|
try {
|
||||||
uh.getUpdateLog().onFirstSearcher(newestSearcher.get());
|
uh.getUpdateLog().seedBucketsWithHighestVersion(newestSearcher.get());
|
||||||
} finally {
|
} finally {
|
||||||
newestSearcher.decref();
|
newestSearcher.decref();
|
||||||
}
|
}
|
||||||
|
@ -2630,8 +2628,6 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1545,6 +1545,10 @@ public class UpdateLog implements PluginInfoInitialized {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Long getCurrentMaxVersion() {
|
||||||
|
return maxVersionFromIndex;
|
||||||
|
}
|
||||||
|
|
||||||
// this method is primarily used for unit testing and is not part of the public API for this class
|
// this method is primarily used for unit testing and is not part of the public API for this class
|
||||||
Long getMaxVersionFromIndex() {
|
Long getMaxVersionFromIndex() {
|
||||||
if (maxVersionFromIndex == null && versionInfo != null) {
|
if (maxVersionFromIndex == null && versionInfo != null) {
|
||||||
|
@ -1599,8 +1603,8 @@ public class UpdateLog implements PluginInfoInitialized {
|
||||||
return highestVersion;
|
return highestVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onFirstSearcher(SolrIndexSearcher newSearcher) {
|
public void seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher) {
|
||||||
log.info("On first searcher opened, looking up max value of version field");
|
log.info("Looking up max value of version field to seed version buckets");
|
||||||
versionInfo.blockUpdates();
|
versionInfo.blockUpdates();
|
||||||
try {
|
try {
|
||||||
maxVersionFromIndex = seedBucketsWithHighestVersion(newSearcher, versionInfo);
|
maxVersionFromIndex = seedBucketsWithHighestVersion(newSearcher, versionInfo);
|
||||||
|
|
|
@ -234,7 +234,7 @@ public class DistributedVersionInfoTest extends AbstractFullDistribZkTestBase {
|
||||||
|
|
||||||
cloudClient.commit();
|
cloudClient.commit();
|
||||||
|
|
||||||
log.info("\n\n\n Total of "+deletedDocs.size()+" docs deleted \n\n\n");
|
log.info("Total of "+deletedDocs.size()+" docs deleted");
|
||||||
|
|
||||||
maxOnLeader = getMaxVersionFromIndex(leader);
|
maxOnLeader = getMaxVersionFromIndex(leader);
|
||||||
maxOnReplica = getMaxVersionFromIndex(replica);
|
maxOnReplica = getMaxVersionFromIndex(replica);
|
||||||
|
|
|
@ -36,7 +36,10 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.servlet.SolrDispatchFilter;
|
import org.apache.solr.servlet.SolrDispatchFilter;
|
||||||
|
import org.apache.solr.update.UpdateHandler;
|
||||||
|
import org.apache.solr.update.UpdateLog;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -201,7 +204,22 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||||
|
|
||||||
// sent 3 docs in so far, verify they are on the leader and replica
|
// sent 3 docs in so far, verify they are on the leader and replica
|
||||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
|
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
|
||||||
|
|
||||||
|
// Get the max version from the replica core to make sure it gets updated after recovery (see SOLR-7625)
|
||||||
|
JettySolrRunner replicaJetty = getJettyOnPort(getReplicaPort(notLeader));
|
||||||
|
SolrDispatchFilter filter = (SolrDispatchFilter)replicaJetty.getDispatchFilter().getFilter();
|
||||||
|
CoreContainer coreContainer = filter.getCores();
|
||||||
|
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
|
||||||
|
String coreName = replicaCoreNodeProps.getCoreName();
|
||||||
|
Long maxVersionBefore = null;
|
||||||
|
try (SolrCore core = coreContainer.getCore(coreName)) {
|
||||||
|
assertNotNull("Core '"+coreName+"' not found for replica: "+notLeader.getName(), core);
|
||||||
|
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
|
||||||
|
maxVersionBefore = ulog.getCurrentMaxVersion();
|
||||||
|
}
|
||||||
|
assertNotNull("max version bucket seed not set for core " + coreName, maxVersionBefore);
|
||||||
|
log.info("Looked up max version bucket seed "+maxVersionBefore+" for core "+coreName);
|
||||||
|
|
||||||
// now up the stakes and do more docs
|
// now up the stakes and do more docs
|
||||||
int numDocs = 1000;
|
int numDocs = 1000;
|
||||||
boolean hasPartition = false;
|
boolean hasPartition = false;
|
||||||
|
@ -228,7 +246,15 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
|
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
|
||||||
|
|
||||||
|
try (SolrCore core = coreContainer.getCore(coreName)) {
|
||||||
|
assertNotNull("Core '" + coreName + "' not found for replica: " + notLeader.getName(), core);
|
||||||
|
Long currentMaxVersion = core.getUpdateHandler().getUpdateLog().getCurrentMaxVersion();
|
||||||
|
log.info("After recovery, looked up NEW max version bucket seed " + currentMaxVersion +
|
||||||
|
" for core " + coreName + ", was: " + maxVersionBefore);
|
||||||
|
assertTrue("max version bucket seed not updated after recovery!", currentMaxVersion > maxVersionBefore);
|
||||||
|
}
|
||||||
|
|
||||||
// verify all docs received
|
// verify all docs received
|
||||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3);
|
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue