SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud specifics into a subclass

Closes #528
This commit is contained in:
Bar Rotstein 2019-03-19 13:26:31 -04:00 committed by David Smiley
parent 6f2b7bf5c0
commit 5b7866b085
10 changed files with 1376 additions and 1208 deletions

View File

@ -128,6 +128,7 @@ Bug Fixes
Improvements Improvements
---------------------- ----------------------
* SOLR-12999: Index replication could delete segments before downloading segments from master if there is not enough * SOLR-12999: Index replication could delete segments before downloading segments from master if there is not enough
disk space (noble) disk space (noble)
@ -166,6 +167,9 @@ Other Changes
* SOLR-8033: Remove debug if branch in HdfsTransactionLog (Kevin Risden) * SOLR-8033: Remove debug if branch in HdfsTransactionLog (Kevin Risden)
* SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud functionality into a subclass.
(Bar Rotstein, David Smiley)
================== 8.0.0 ================== ================== 8.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
* by the target cluster. * by the target cluster.
* </p> * </p>
*/ */
public class CdcrUpdateProcessor extends DistributedUpdateProcessor { public class CdcrUpdateProcessor extends DistributedZkUpdateProcessor {
public static final String CDCR_UPDATE = "cdcr.update"; public static final String CDCR_UPDATE = "cdcr.update";

View File

@ -49,9 +49,16 @@ public class DistributedUpdateProcessorFactory
@Override @Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) { SolrQueryResponse rsp, UpdateRequestProcessor next) {
final boolean isZkAware = req.getCore().getCoreContainer().isZooKeeperAware();
DistributedUpdateProcessor distribUpdateProcessor =
isZkAware ?
new DistributedZkUpdateProcessor(req, rsp, next) :
new DistributedUpdateProcessor(req, rsp, next);
// note: will sometimes return DURP (no overhead) instead of wrapping // note: will sometimes return DURP (no overhead) instead of wrapping
return RoutedAliasUpdateProcessor.wrap(req, return RoutedAliasUpdateProcessor.wrap(req,
new DistributedUpdateProcessor(req, rsp, next)); distribUpdateProcessor);
} }
} }

View File

@ -368,7 +368,8 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
return true; return true;
} }
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check. // if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
return !distribProc.isLeader(cmd); distribProc.setupRequest(cmd);
return !distribProc.isLeader();
} }
@Override @Override

View File

@ -216,7 +216,8 @@ public class SkipExistingDocumentsProcessorFactory extends UpdateRequestProcesso
if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) { if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
return false; return false;
} }
return distribProc.isLeader(cmd); distribProc.setupRequest(cmd);
return distribProc.isLeader();
} }
@Override @Override

View File

@ -232,7 +232,7 @@ public class AtomicUpdateProcessorFactoryTest extends SolrTestCaseJ4 {
try { try {
factory.getInstance(cmd.getReq(), new SolrQueryResponse(), factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(), createDistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd); new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
} catch (IOException e) { } catch (IOException e) {
} }

View File

@ -34,11 +34,10 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
} }
@Test @Test
public void testShouldBufferUpdate() { public void testShouldBufferUpdateZk() {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams()); SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
DistributedUpdateProcessor processor = new DistributedUpdateProcessor( DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
req, null, null, null); req, null, null, null);
AddUpdateCommand cmd = new AddUpdateCommand(req); AddUpdateCommand cmd = new AddUpdateCommand(req);
// applying buffer updates, isReplayOrPeerSync flag doesn't matter // applying buffer updates, isReplayOrPeerSync flag doesn't matter
assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED)); assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
@ -50,5 +49,4 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED)); assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
} }
} }

View File

@ -131,6 +131,9 @@ import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.DirectSolrConnection; import org.apache.solr.servlet.DirectSolrConnection;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedZkUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.util.LogLevel; import org.apache.solr.util.LogLevel;
import org.apache.solr.util.RandomizeSSL; import org.apache.solr.util.RandomizeSSL;
import org.apache.solr.util.RandomizeSSL.SSLRandomizer; import org.apache.solr.util.RandomizeSSL.SSLRandomizer;
@ -2891,6 +2894,14 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
} }
} }
public static DistributedUpdateProcessor createDistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp,
UpdateRequestProcessor next) {
if(h.getCoreContainer().isZooKeeperAware()) {
return new DistributedZkUpdateProcessor(req, rsp, next);
}
return new DistributedUpdateProcessor(req, rsp, next);
}
/** /**
* Cleans up the randomized sysproperties and variables set by {@link #randomizeNumericTypesProperties} * Cleans up the randomized sysproperties and variables set by {@link #randomizeNumericTypesProperties}