SOLR-6136: Fix spin lock in blockUntilFinished (which eats CPU unnecessarily) using wait/notify and add a unit test for ConcurrentUpdateSolrServer

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1610856 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-07-15 21:29:01 +00:00
parent 620f37628a
commit 14e015dff7
3 changed files with 269 additions and 48 deletions

View File

@ -184,6 +184,8 @@ Bug Fixes
schema swap-out process
(Gregory Chanan via Steve Rowe)
* SOLR-6136: ConcurrentUpdateSolrServer includes a Spin Lock (Brandon Chapman, Timothy Potter)
Optimizations
---------------------

View File

@ -157,8 +157,8 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
try {
while (!queue.isEmpty()) {
try {
final UpdateRequest updateRequest = queue.poll(250,
TimeUnit.MILLISECONDS);
final UpdateRequest updateRequest =
queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
if (updateRequest == null)
break;
@ -206,6 +206,7 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
out.flush();
req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
if (isXml) {
out.write("</stream>".getBytes(StandardCharsets.UTF_8));
}
@ -229,14 +230,12 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
method.addHeader("User-Agent", HttpSolrServer.AGENT);
method.addHeader("Content-Type", contentType);
response = server.getHttpClient().execute(method);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append(response.getStatusLine().getReasonPhrase());
msg.append("\n\n");
msg.append("\n\n");
msg.append("\n\n\n\n");
msg.append("request: ").append(method.getURI());
handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
} else {
@ -264,6 +263,8 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
scheduler.execute(this);
} else {
runners.remove(this);
if (runners.isEmpty())
runners.notifyAll();
}
}
@ -319,17 +320,11 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
for (;;) {
synchronized (runners) {
if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() // queue
// is
// half
// full
// and
// we
// can
// add
// more
// runners
&& runners.size() < threadCount)) {
// see if queue is half full and we can add more runners
// special case: if only using a threadCount of 1 and the queue
// is filling up, allow 1 add'l runner to help process the queue
if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
{
// We need more runners, so start a new one.
Runner r = new Runner();
runners.add(r);
@ -358,9 +353,7 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
if (!success) {
success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
}
}
} catch (InterruptedException e) {
log.error("interrupted", e);
throw new IOException(e.getLocalizedMessage());
@ -375,27 +368,27 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
public synchronized void blockUntilFinished() {
lock = new CountDownLatch(1);
try {
// Wait until no runners are running
for (;;) {
Runner runner;
synchronized (runners) {
runner = runners.peek();
}
synchronized (runners) {
while (!runners.isEmpty()) {
try {
runners.wait();
} catch (InterruptedException e) {
Thread.interrupted();
}
if ((runner == null && queue.isEmpty()) || scheduler.isTerminated())
break;
if (scheduler.isTerminated())
break;
if (runner != null) {
runner.runnerLock.lock();
runner.runnerLock.unlock();
} else if (!queue.isEmpty()) {
// failsafe - should not be necessary, but a good
// precaution to ensure blockUntilFinished guarantees
// all updates are emptied from the queue regardless of
// any bugs around starting or retaining runners
Runner r = new Runner();
runners.add(r);
scheduler.execute(r);
// if we reach here, then we probably got the notifyAll, but need to check if
// the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size();
if (queueSize > 0) {
log.warn("No more runners, but queue still has "+
queueSize+" adding more runners to process remaining requests on queue");
Runner r = new Runner();
runners.add(r);
scheduler.execute(r);
}
}
}
} finally {
@ -450,8 +443,8 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
if (shutdownExecutor) {
scheduler.shutdownNow(); // Cancel currently executing tasks
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
.error("ExecutorService did not terminate");
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
log.error("ExecutorService did not terminate");
} catch (InterruptedException ie) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpResponse;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.ExternalPaths;
import org.junit.BeforeClass;
import org.junit.Test;
public class ConcurrentUpdateSolrServerTest extends SolrJettyTestBase {
/**
* Mock endpoint where the CUSS being tested in this class sends requests.
*/
public static class TestServlet extends HttpServlet
implements JavaBinUpdateRequestCodec.StreamingUpdateHandler
{
private static final long serialVersionUID = 1L;
public static void clear() {
lastMethod = null;
headers = null;
parameters = null;
errorCode = null;
numReqsRcvd.set(0);
numDocsRcvd.set(0);
}
public static Integer errorCode = null;
public static String lastMethod = null;
public static HashMap<String,String> headers = null;
public static Map<String,String[]> parameters = null;
public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
public static void setErrorCode(Integer code) {
errorCode = code;
}
private void setHeaders(HttpServletRequest req) {
Enumeration<String> headerNames = req.getHeaderNames();
headers = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String name = headerNames.nextElement();
headers.put(name, req.getHeader(name));
}
}
private void setParameters(HttpServletRequest req) {
parameters = req.getParameterMap();
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
numReqsRcvd.incrementAndGet();
lastMethod = "post";
recordRequest(req, resp);
InputStream reqIn = req.getInputStream();
JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
for (;;) {
try {
javabin.unmarshal(reqIn, this);
} catch (EOFException e) {
break; // this is expected
}
}
}
private void recordRequest(HttpServletRequest req, HttpServletResponse resp) {
setHeaders(req);
setParameters(req);
if (null != errorCode) {
try {
resp.sendError(errorCode);
} catch (IOException e) {
throw new RuntimeException("sendError IO fail in TestServlet", e);
}
}
}
@Override
public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean override) {
numDocsRcvd.incrementAndGet();
}
} // end TestServlet
@BeforeClass
public static void beforeTest() throws Exception {
createJetty(ExternalPaths.EXAMPLE_HOME, null, null);
jetty.getDispatchFilter().getServletHandler()
.addServletWithMapping(TestServlet.class, "/cuss/*");
}
@Test
public void testConcurrentUpdate() throws Exception {
TestServlet.clear();
String serverUrl = jetty.getBaseUrl().toString() + "/cuss/foo";
int cussThreadCount = 2;
int cussQueueSize = 100;
// for tracking callbacks from CUSS
final AtomicInteger successCounter = new AtomicInteger(0);
final AtomicInteger errorCounter = new AtomicInteger(0);
final StringBuilder errors = new StringBuilder();
@SuppressWarnings("serial")
ConcurrentUpdateSolrServer cuss = new ConcurrentUpdateSolrServer(serverUrl, cussQueueSize, cussThreadCount) {
@Override
public void handleError(Throwable ex) {
errorCounter.incrementAndGet();
errors.append(" "+ex);
}
@Override
public void onSuccess(HttpResponse resp) {
successCounter.incrementAndGet();
}
};
cuss.setParser(new BinaryResponseParser());
cuss.setRequestWriter(new BinaryRequestWriter());
cuss.setPollQueueTime(0);
// ensure it doesn't block where there's nothing to do yet
cuss.blockUntilFinished();
int poolSize = 5;
ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);
int numDocs = 100;
int numRunnables = 5;
for (int r=0; r < numRunnables; r++)
threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, cuss));
// ensure all docs are sent
threadPool.awaitTermination(5, TimeUnit.SECONDS);
threadPool.shutdown();
// wait until all requests are processed by CUSS
cuss.blockUntilFinished();
cuss.shutdownNow();
assertEquals("post", TestServlet.lastMethod);
// expect all requests to be successful
int expectedSuccesses = TestServlet.numReqsRcvd.get();
assertTrue(expectedSuccesses > 0); // at least one request must have been sent
assertTrue("Expected no errors but got "+errorCounter.get()+
", due to: "+errors.toString(), errorCounter.get() == 0);
assertTrue("Expected "+expectedSuccesses+" successes, but got "+successCounter.get(),
successCounter.get() == expectedSuccesses);
int expectedDocs = numDocs * numRunnables;
assertTrue("Expected CUSS to send "+expectedDocs+" but got "+TestServlet.numDocsRcvd.get(),
TestServlet.numDocsRcvd.get() == expectedDocs);
}
class SendDocsRunnable implements Runnable {
private String id;
private int numDocs;
private ConcurrentUpdateSolrServer cuss;
SendDocsRunnable(String id, int numDocs, ConcurrentUpdateSolrServer cuss) {
this.id = id;
this.numDocs = numDocs;
this.cuss = cuss;
}
@Override
public void run() {
for (int d=0; d < numDocs; d++) {
SolrInputDocument doc = new SolrInputDocument();
String docId = id+"_"+d;
doc.setField("id", docId);
UpdateRequest req = new UpdateRequest();
req.add(doc);
try {
cuss.request(req);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
}