SOLR-6406: fix race/hang in ConcurrentUpdateSolrClient.blockUntilFinished when executor service is shut down

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1712045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2015-11-02 15:29:00 +00:00
parent 548ceacaaa
commit e708c90466
2 changed files with 172 additions and 149 deletions

View File

@ -312,6 +312,11 @@ Bug Fixes
be aborted. This can cause big update reorders that can cause replicas to
get out of sync. (Mark Miller, yonik)
* SOLR-6406: ConcurrentUpdateSolrClient hang in blockUntilFinished. If updates are still
flowing and shutdown is called on the executor service used by ConcurrentUpdateSolrClient,
a race condition can cause that client to hang in blockUntilFinished.
(Mark Miller, yonik)
Optimizations
----------------------

View File

@ -148,157 +148,185 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
* Opens a connection and sends everything...
*/
class Runner implements Runnable {
final Lock runnerLock = new ReentrantLock();
@Override
public void run() {
runnerLock.lock();
log.debug("starting runner: {}", this);
HttpPost method = null;
HttpResponse response = null;
try {
while (!queue.isEmpty()) {
try {
final UpdateRequest updateRequest =
queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
if (updateRequest == null)
// This loop is so we can continue if an element was added to the queue after the last runner exited.
for (;;) {
try {
sendUpdateStream();
} catch (Throwable e) {
if (e instanceof OutOfMemoryError) {
throw (OutOfMemoryError) e;
}
handleError(e);
} finally {
synchronized (runners) {
// check to see if anything else was added to the queue
if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
// If there is something else to process, keep last runner alive by staying in the loop.
} else {
runners.remove(this);
if (runners.isEmpty()) {
// notify anyone waiting in blockUntilFinished
runners.notifyAll();
}
break;
String contentType = client.requestWriter.getUpdateContentType();
final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
}
}
final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
}
}
EntityTemplate template = new EntityTemplate(new ContentProducer() {
log.debug("finished: {}", this);
}
@Override
public void writeTo(OutputStream out) throws IOException {
try {
//
// Pull from the queue multiple times and streams over a single connection.
// Exits on exception, interruption, or an empty queue to pull from.
//
void sendUpdateStream() throws Exception {
while (!queue.isEmpty()) {
HttpPost method = null;
HttpResponse response = null;
try {
final UpdateRequest updateRequest =
queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
if (updateRequest == null)
break;
String contentType = client.requestWriter.getUpdateContentType();
final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
EntityTemplate template = new EntityTemplate(new ContentProducer() {
@Override
public void writeTo(OutputStream out) throws IOException {
try {
if (isXml) {
out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
}
UpdateRequest req = updateRequest;
while (req != null) {
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
queue.add(req); // params are different, push back to queue
break;
}
client.requestWriter.write(req, out);
if (isXml) {
out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
}
UpdateRequest req = updateRequest;
while (req != null) {
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
queue.add(req); // params are different, push back to queue
break;
}
client.requestWriter.write(req, out);
if (isXml) {
// check for commit or optimize
SolrParams params = req.getParams();
if (params != null) {
String fmt = null;
if (params.getBool(UpdateParams.OPTIMIZE, false)) {
fmt = "<optimize waitSearcher=\"%s\" />";
} else if (params.getBool(UpdateParams.COMMIT, false)) {
fmt = "<commit waitSearcher=\"%s\" />";
}
if (fmt != null) {
byte[] content = String.format(Locale.ROOT,
fmt,
params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ "").getBytes(StandardCharsets.UTF_8);
out.write(content);
}
// check for commit or optimize
SolrParams params = req.getParams();
if (params != null) {
String fmt = null;
if (params.getBool(UpdateParams.OPTIMIZE, false)) {
fmt = "<optimize waitSearcher=\"%s\" />";
} else if (params.getBool(UpdateParams.COMMIT, false)) {
fmt = "<commit waitSearcher=\"%s\" />";
}
if (fmt != null) {
byte[] content = String.format(Locale.ROOT,
fmt,
params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ "").getBytes(StandardCharsets.UTF_8);
out.write(content);
}
}
out.flush();
if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
// no need to wait to see another doc in the queue if we've hit the last doc in a batch
req = queue.poll(0, TimeUnit.MILLISECONDS);
} else {
req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
}
if (isXml) {
out.write("</stream>".getBytes(StandardCharsets.UTF_8));
out.flush();
if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
// no need to wait to see another doc in the queue if we've hit the last doc in a batch
req = queue.poll(0, TimeUnit.MILLISECONDS);
} else {
req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("", e);
}
if (isXml) {
out.write("</stream>".getBytes(StandardCharsets.UTF_8));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("", e);
}
});
// The parser 'wt=' and 'version=' params are used instead of the
// original params
ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
requestParams.set(CommonParams.WT, client.parser.getWriterType());
requestParams.set(CommonParams.VERSION, client.parser.getVersion());
method = new HttpPost(client.getBaseURL() + "/update"
+ ClientUtils.toQueryString(requestParams, false));
method.setEntity(template);
method.addHeader("User-Agent", HttpSolrClient.AGENT);
method.addHeader("Content-Type", contentType);
response = client.getHttpClient().execute(method);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append(response.getStatusLine().getReasonPhrase());
msg.append("\n\n\n\n");
msg.append("request: ").append(method.getURI());
SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
// parse out the metadata from the SolrException
try {
NamedList<Object> resp =
client.parser.processResponse(response.getEntity().getContent(),
response.getEntity().getContentType().getValue());
NamedList<Object> error = (NamedList<Object>) resp.get("error");
if (error != null)
solrExc.setMetadata((NamedList<String>) error.get("metadata"));
} catch (Exception exc) {
// don't want to fail to report error if parsing the response fails
log.warn("Failed to parse error response from "+ client.getBaseURL()+" due to: "+exc);
}
handleError(solrExc);
} else {
onSuccess(response);
}
} finally {
});
// The parser 'wt=' and 'version=' params are used instead of the
// original params
ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
requestParams.set(CommonParams.WT, client.parser.getWriterType());
requestParams.set(CommonParams.VERSION, client.parser.getVersion());
method = new HttpPost(client.getBaseURL() + "/update"
+ ClientUtils.toQueryString(requestParams, false));
method.setEntity(template);
method.addHeader("User-Agent", HttpSolrClient.AGENT);
method.addHeader("Content-Type", contentType);
response = client.getHttpClient().execute(method);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append(response.getStatusLine().getReasonPhrase());
msg.append("\n\n\n\n");
msg.append("request: ").append(method.getURI());
SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
// parse out the metadata from the SolrException
try {
if (response != null) {
response.getEntity().getContent().close();
}
} catch (Exception ex) {
log.warn("", ex);
NamedList<Object> resp =
client.parser.processResponse(response.getEntity().getContent(),
response.getEntity().getContentType().getValue());
NamedList<Object> error = (NamedList<Object>) resp.get("error");
if (error != null)
solrExc.setMetadata((NamedList<String>) error.get("metadata"));
} catch (Exception exc) {
// don't want to fail to report error if parsing the response fails
log.warn("Failed to parse error response from " + client.getBaseURL() + " due to: " + exc);
}
}
}
} catch (Throwable e) {
if (e instanceof OutOfMemoryError) {
throw (OutOfMemoryError) e;
}
handleError(e);
} finally {
synchronized (runners) {
if (runners.size() == 1 && !queue.isEmpty()) {
// keep this runner alive
scheduler.execute(this);
} else {
runners.remove(this);
if (runners.isEmpty())
runners.notifyAll();
}
}
log.debug("finished: {}", this);
runnerLock.unlock();
handleError(solrExc);
} else {
onSuccess(response);
}
} finally {
try {
if (response != null) {
response.getEntity().getContent().close();
}
} catch (Exception ex) {
log.warn("", ex);
}
}
}
}
}
// *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
private void addRunner() {
MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
try {
Runner r = new Runner();
runners.add(r);
scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine.
} finally {
MDC.remove("ConcurrentUpdateSolrClient.url");
}
}
@Override
public NamedList<Object> request(final SolrRequest request, String collection)
throws SolrServerException, IOException {
@ -351,14 +379,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
{
// We need more runners, so start a new one.
MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
try {
Runner r = new Runner();
runners.add(r);
scheduler.execute(r);
} finally {
MDC.remove("ConcurrentUpdateSolrClient.url");
}
addRunner();
} else {
// break out of the retry loop if we added the element to the queue
// successfully, *and*
@ -399,30 +420,27 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
lock = new CountDownLatch(1);
try {
synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
// which means it would never remove itself from the runners list. This is why we don't wait forever
// and periodically check if the scheduler is shutting down.
while (!runners.isEmpty()) {
try {
runners.wait();
runners.wait(250);
} catch (InterruptedException e) {
Thread.interrupted();
}
if (scheduler.isTerminated())
if (scheduler.isShutdown())
break;
// if we reach here, then we probably got the notifyAll, but need to check if
// the queue is empty before really considering this is finished (SOLR-4260)
// Need to check if the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size();
if (queueSize > 0) {
if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen?
log.warn("No more runners, but queue still has "+
queueSize+" adding more runners to process remaining requests on queue");
MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
try {
Runner r = new Runner();
runners.add(r);
scheduler.execute(r);
} finally {
MDC.remove("ConcurrentUpdateSolrClient.url");
}
addRunner();
}
}
}