From 144b4e8f128366e709ba293f9b24944231a03084 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Wed, 20 May 2015 01:03:23 +0000 Subject: [PATCH] SOLR-7333: Make the poll queue time configurable and use knowledge that a batch is being processed to poll efficiently git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1680436 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 4 + .../handler/loader/ContentStreamLoader.java | 2 + .../solr/handler/loader/JavabinLoader.java | 13 ++- .../apache/solr/update/AddUpdateCommand.java | 5 + .../solr/update/SolrCmdDistributor.java | 12 ++- .../solr/update/StreamingSolrClients.java | 2 +- .../handler/loader/JavabinLoaderTest.java | 92 +++++++++++++++++++ .../impl/ConcurrentUpdateSolrClient.java | 11 ++- .../request/JavaBinUpdateRequestCodec.java | 21 ++++- .../client/solrj/request/UpdateRequest.java | 11 ++- .../apache/solr/common/util/JavaBinCodec.java | 2 +- 11 files changed, 159 insertions(+), 16 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5c7e479fbc3..111f2518a1d 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -303,6 +303,10 @@ Optimizations * SOLR-7547: Short circuit SolrDisptachFilter for static content request. Right now it creates a new HttpSolrCall object and tries to process it. (Anshum Gupta) +* SOLR-7333: Make the poll queue time a leader uses when distributing updates to replicas + configurable and use knowledge that a batch is being processed to poll efficiently. + (Timothy Potter) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java index a0feb2c7902..6ab054de2d7 100644 --- a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java +++ b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java @@ -31,6 +31,8 @@ import org.apache.solr.update.processor.UpdateRequestProcessor; */ public abstract class ContentStreamLoader { + protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25); + /** * This should be called once for each RequestHandler */ diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java index aca84532803..e05fff73e5e 100644 --- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java +++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java @@ -49,7 +49,7 @@ import java.util.Set; */ public class JavabinLoader extends ContentStreamLoader { public static Logger log = LoggerFactory.getLogger(JavabinLoader.class); - + @Override public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception { InputStream is = null; @@ -91,7 +91,12 @@ public class JavabinLoader extends ContentStreamLoader { if (overwrite != null) { addCmd.overwrite = overwrite; } - + + if (updateRequest.isLastDocInBatch()) { + // this is a hint to downstream code that indicates we've sent the last doc in a batch + addCmd.isLastDocInBatch = true; + } + try { processor.processAdd(addCmd); addCmd.clear(); @@ -115,7 +120,9 @@ public class JavabinLoader extends ContentStreamLoader { private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) { AddUpdateCommand addCmd = new AddUpdateCommand(req); - + // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger + // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily + addCmd.pollQueueTime = pollQueueTime; addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true); addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1); return addCmd; diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java index a00cccb600c..db5ad26bdba 100644 --- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java +++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java @@ -50,6 +50,10 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException { - + String cmdStr = cmd.toString(); for (Node node : nodes) { UpdateRequest uReq = new UpdateRequest(); + if (cmd.isLastDocInBatch) + uReq.lastDocInBatch(); uReq.setParams(params); uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); - submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false); + submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false); } } @@ -310,17 +312,19 @@ public class SolrCmdDistributor { public boolean synchronous; public String cmdString; public RequestReplicationTracker rfTracker; + public int pollQueueTime; public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) { - this(cmdString, node, uReq, synchronous, null); + this(cmdString, node, uReq, synchronous, null, 0); } - public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) { + public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) { this.node = node; this.uReq = uReq; this.synchronous = synchronous; this.cmdString = cmdString; this.rfTracker = rfTracker; + this.pollQueueTime = pollQueueTime; } public String toString() { diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java index 935de07c7a9..2fcea1e2a61 100644 --- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java +++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java @@ -87,7 +87,7 @@ public class StreamingSolrClients { }; client.setParser(new BinaryResponseParser()); client.setRequestWriter(new BinaryRequestWriter()); - client.setPollQueueTime(0); + client.setPollQueueTime(req.pollQueueTime); Set queryParams = new HashSet<>(2); queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM); queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); diff --git a/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java b/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java new file mode 100644 index 00000000000..b6377799e56 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/loader/JavabinLoaderTest.java @@ -0,0 +1,92 @@ +package org.apache.solr.handler.loader; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.ContentStreamBase; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.processor.BufferingRequestProcessor; +import org.junit.BeforeClass; + +public class JavabinLoaderTest extends SolrTestCaseJ4 { + @BeforeClass + public static void beforeTests() throws Exception { + initCore("solrconfig.xml","schema.xml"); + } + + /** + * Verifies the isLastDocInBatch flag gets set correctly for a batch of docs and for a request with a single doc. + */ + public void testLastDocInBatchFlag() throws Exception { + doTestLastDocInBatchFlag(1); // single doc + doTestLastDocInBatchFlag(2); // multiple docs + } + + protected void doTestLastDocInBatchFlag(int numDocsInBatch) throws Exception { + List batch = new ArrayList<>(numDocsInBatch); + for (int d=0; d < numDocsInBatch; d++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.setField("id", String.valueOf(d)); + batch.add(doc); + } + + UpdateRequest updateRequest = new UpdateRequest(); + if (batch.size() > 1) { + updateRequest.add(batch); + } else { + updateRequest.add(batch.get(0)); + } + + // client-side SolrJ would do this ... + ByteArrayOutputStream os = new ByteArrayOutputStream(); + (new JavaBinUpdateRequestCodec()).marshal(updateRequest, os); + + // need to override the processAdd method b/c JavabinLoader calls + // clear on the addCmd after it is passed on to the handler ... a simple clone will suffice for this test + BufferingRequestProcessor mockUpdateProcessor = new BufferingRequestProcessor(null) { + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { + addCommands.add((AddUpdateCommand)cmd.clone()); + } + }; + + SolrQueryRequest req = req(); + (new JavabinLoader()).load(req, + new SolrQueryResponse(), + new ContentStreamBase.ByteArrayStream(os.toByteArray(), "test"), + mockUpdateProcessor); + req.close(); + + assertTrue(mockUpdateProcessor.addCommands.size() == numDocsInBatch); + for (int i=0; i < numDocsInBatch-1; i++) + assertFalse(mockUpdateProcessor.addCommands.get(i).isLastDocInBatch); // not last doc in batch + + // last doc should have the flag set + assertTrue(mockUpdateProcessor.addCommands.get(batch.size()-1).isLastDocInBatch); + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java index 667016b48c3..66306121fa1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java @@ -156,7 +156,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { log.debug("starting runner: {}", this); HttpPost method = null; - HttpResponse response = null; + HttpResponse response = null; try { while (!queue.isEmpty()) { try { @@ -207,7 +207,14 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } } out.flush(); - req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + + if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) { + // no need to wait to see another doc in the queue if we've hit the last doc in a batch + req = queue.poll(0, TimeUnit.MILLISECONDS); + } else { + req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); + } + } if (isXml) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java index 7bb64b428f2..47521debcf5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java @@ -126,7 +126,6 @@ public class JavaBinUpdateRequestCodec { @Override public List readIterator(DataInputInputStream fis) throws IOException { - // default behavior for reading any regular Iterator in the stream if (seenOuterMostDocIterator) return super.readIterator(fis); @@ -142,9 +141,16 @@ public class JavaBinUpdateRequestCodec { if (handler == null) return super.readIterator(fis); Integer commitWithin = null; Boolean overwrite = null; + Object o = null; while (true) { - Object o = readVal(fis); - if (o == END_OBJ) break; + if (o == null) { + o = readVal(fis); + } + + if (o == END_OBJ) { + break; + } + SolrInputDocument sdoc = null; if (o instanceof List) { sdoc = listToSolrInputDocument((List) o); @@ -160,9 +166,16 @@ public class JavaBinUpdateRequestCodec { overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE); } } else { - sdoc = (SolrInputDocument) o; } + + // peek at the next object to see if we're at the end + o = readVal(fis); + if (o == END_OBJ) { + // indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication + updateRequest.lastDocInBatch(); + } + handler.update(sdoc, updateRequest, commitWithin, overwrite); } return Collections.EMPTY_LIST; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java index 4fcde62cb81..3472347bdd3 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java @@ -57,7 +57,9 @@ public class UpdateRequest extends AbstractUpdateRequest { private Iterator docIterator = null; private Map> deleteById = null; private List deleteQuery = null; - + + private boolean isLastDocInBatch = false; + public UpdateRequest() { super(METHOD.POST, "/update"); } @@ -460,4 +462,11 @@ public class UpdateRequest extends AbstractUpdateRequest { return deleteQuery; } + public boolean isLastDocInBatch() { + return isLastDocInBatch; + } + + public void lastDocInBatch() { + isLastDocInBatch = true; + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java index 3674f4dde29..b40afde6974 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java @@ -555,7 +555,7 @@ public class JavaBinCodec { @Override public String toString() { - return "MapEntry[" + key.toString() + ":" + value.toString() + "]"; + return "MapEntry[" + key + ":" + value + "]"; } @Override