SOLR-7333: Make the poll queue time configurable and use knowledge that a batch is being processed to poll efficiently

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1680436 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2015-05-20 01:03:23 +00:00
parent 8936a16554
commit 144b4e8f12
11 changed files with 159 additions and 16 deletions

View File

@ -303,6 +303,10 @@ Optimizations
* SOLR-7547: Short circuit SolrDisptachFilter for static content request. Right now it creates
a new HttpSolrCall object and tries to process it. (Anshum Gupta)
* SOLR-7333: Make the poll queue time a leader uses when distributing updates to replicas
configurable and use knowledge that a batch is being processed to poll efficiently.
(Timothy Potter)
Other Changes
----------------------

View File

@ -31,6 +31,8 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
*/
public abstract class ContentStreamLoader {
protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
/**
* This should be called once for each RequestHandler
*/

View File

@ -49,7 +49,7 @@ import java.util.Set;
*/
public class JavabinLoader extends ContentStreamLoader {
public static Logger log = LoggerFactory.getLogger(JavabinLoader.class);
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
InputStream is = null;
@ -91,7 +91,12 @@ public class JavabinLoader extends ContentStreamLoader {
if (overwrite != null) {
addCmd.overwrite = overwrite;
}
if (updateRequest.isLastDocInBatch()) {
// this is a hint to downstream code that indicates we've sent the last doc in a batch
addCmd.isLastDocInBatch = true;
}
try {
processor.processAdd(addCmd);
addCmd.clear();
@ -115,7 +120,9 @@ public class JavabinLoader extends ContentStreamLoader {
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
// since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
// pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
addCmd.pollQueueTime = pollQueueTime;
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
return addCmd;

View File

@ -50,6 +50,10 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<IndexDoc
public Term updateTerm;
public int commitWithin = -1;
public boolean isLastDocInBatch = false;
public int pollQueueTime = 0;
public AddUpdateCommand(SolrQueryRequest req) {
super(req);
@ -65,6 +69,7 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<IndexDoc
solrDoc = null;
indexedId = null;
updateTerm = null;
isLastDocInBatch = false;
version = 0;
}

View File

@ -196,12 +196,14 @@ public class SolrCmdDistributor {
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {
String cmdStr = cmd.toString();
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
if (cmd.isLastDocInBatch)
uReq.lastDocInBatch();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
}
}
@ -310,17 +312,19 @@ public class SolrCmdDistributor {
public boolean synchronous;
public String cmdString;
public RequestReplicationTracker rfTracker;
public int pollQueueTime;
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
this(cmdString, node, uReq, synchronous, null);
this(cmdString, node, uReq, synchronous, null, 0);
}
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmdString = cmdString;
this.rfTracker = rfTracker;
this.pollQueueTime = pollQueueTime;
}
public String toString() {

View File

@ -87,7 +87,7 @@ public class StreamingSolrClients {
};
client.setParser(new BinaryResponseParser());
client.setRequestWriter(new BinaryRequestWriter());
client.setPollQueueTime(0);
client.setPollQueueTime(req.pollQueueTime);
Set<String> queryParams = new HashSet<>(2);
queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);

View File

@ -0,0 +1,92 @@
package org.apache.solr.handler.loader;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.processor.BufferingRequestProcessor;
import org.junit.BeforeClass;
public class JavabinLoaderTest extends SolrTestCaseJ4 {
@BeforeClass
public static void beforeTests() throws Exception {
initCore("solrconfig.xml","schema.xml");
}
/**
* Verifies the isLastDocInBatch flag gets set correctly for a batch of docs and for a request with a single doc.
*/
public void testLastDocInBatchFlag() throws Exception {
doTestLastDocInBatchFlag(1); // single doc
doTestLastDocInBatchFlag(2); // multiple docs
}
protected void doTestLastDocInBatchFlag(int numDocsInBatch) throws Exception {
List<SolrInputDocument> batch = new ArrayList<>(numDocsInBatch);
for (int d=0; d < numDocsInBatch; d++) {
SolrInputDocument doc = new SolrInputDocument();
doc.setField("id", String.valueOf(d));
batch.add(doc);
}
UpdateRequest updateRequest = new UpdateRequest();
if (batch.size() > 1) {
updateRequest.add(batch);
} else {
updateRequest.add(batch.get(0));
}
// client-side SolrJ would do this ...
ByteArrayOutputStream os = new ByteArrayOutputStream();
(new JavaBinUpdateRequestCodec()).marshal(updateRequest, os);
// need to override the processAdd method b/c JavabinLoader calls
// clear on the addCmd after it is passed on to the handler ... a simple clone will suffice for this test
BufferingRequestProcessor mockUpdateProcessor = new BufferingRequestProcessor(null) {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
addCommands.add((AddUpdateCommand)cmd.clone());
}
};
SolrQueryRequest req = req();
(new JavabinLoader()).load(req,
new SolrQueryResponse(),
new ContentStreamBase.ByteArrayStream(os.toByteArray(), "test"),
mockUpdateProcessor);
req.close();
assertTrue(mockUpdateProcessor.addCommands.size() == numDocsInBatch);
for (int i=0; i < numDocsInBatch-1; i++)
assertFalse(mockUpdateProcessor.addCommands.get(i).isLastDocInBatch); // not last doc in batch
// last doc should have the flag set
assertTrue(mockUpdateProcessor.addCommands.get(batch.size()-1).isLastDocInBatch);
}
}

View File

@ -156,7 +156,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
log.debug("starting runner: {}", this);
HttpPost method = null;
HttpResponse response = null;
HttpResponse response = null;
try {
while (!queue.isEmpty()) {
try {
@ -207,7 +207,14 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
}
out.flush();
req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
// no need to wait to see another doc in the queue if we've hit the last doc in a batch
req = queue.poll(0, TimeUnit.MILLISECONDS);
} else {
req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
}
if (isXml) {

View File

@ -126,7 +126,6 @@ public class JavaBinUpdateRequestCodec {
@Override
public List readIterator(DataInputInputStream fis) throws IOException {
// default behavior for reading any regular Iterator in the stream
if (seenOuterMostDocIterator) return super.readIterator(fis);
@ -142,9 +141,16 @@ public class JavaBinUpdateRequestCodec {
if (handler == null) return super.readIterator(fis);
Integer commitWithin = null;
Boolean overwrite = null;
Object o = null;
while (true) {
Object o = readVal(fis);
if (o == END_OBJ) break;
if (o == null) {
o = readVal(fis);
}
if (o == END_OBJ) {
break;
}
SolrInputDocument sdoc = null;
if (o instanceof List) {
sdoc = listToSolrInputDocument((List<NamedList>) o);
@ -160,9 +166,16 @@ public class JavaBinUpdateRequestCodec {
overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
}
} else {
sdoc = (SolrInputDocument) o;
}
// peek at the next object to see if we're at the end
o = readVal(fis);
if (o == END_OBJ) {
// indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication
updateRequest.lastDocInBatch();
}
handler.update(sdoc, updateRequest, commitWithin, overwrite);
}
return Collections.EMPTY_LIST;

View File

@ -57,7 +57,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
private Iterator<SolrInputDocument> docIterator = null;
private Map<String,Map<String,Object>> deleteById = null;
private List<String> deleteQuery = null;
private boolean isLastDocInBatch = false;
public UpdateRequest() {
super(METHOD.POST, "/update");
}
@ -460,4 +462,11 @@ public class UpdateRequest extends AbstractUpdateRequest {
return deleteQuery;
}
public boolean isLastDocInBatch() {
return isLastDocInBatch;
}
public void lastDocInBatch() {
isLastDocInBatch = true;
}
}

View File

@ -555,7 +555,7 @@ public class JavaBinCodec {
@Override
public String toString() {
return "MapEntry[" + key.toString() + ":" + value.toString() + "]";
return "MapEntry[" + key + ":" + value + "]";
}
@Override