SOLR-9824: Some bulk update paths could be very slow due to CUSC polling.

This commit is contained in:
markrmiller 2017-02-22 13:00:42 -05:00
parent 2f82409e5b
commit d6337ac3e5
8 changed files with 253 additions and 99 deletions

View File

@ -178,6 +178,8 @@ Bug Fixes
* SOLR-10168: ShardSplit can fail with NPE in OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete. (Mark Miller)
* SOLR-9824: Some bulk update paths could be very slow due to CUSC polling. (David Smiley, Mark Miller)
Optimizations
----------------------

View File

@ -29,8 +29,6 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
*/
public abstract class ContentStreamLoader {
protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
/**
* This should be called once for each RequestHandler
*/

View File

@ -116,9 +116,6 @@ public class JavabinLoader extends ContentStreamLoader {
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
// since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
// pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
addCmd.pollQueueTime = pollQueueTime;
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
return addCmd;

View File

@ -60,8 +60,6 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
public int commitWithin = -1;
public boolean isLastDocInBatch = false;
public int pollQueueTime = 0;
public AddUpdateCommand(SolrQueryRequest req) {
super(req);

View File

@ -36,6 +36,7 @@ import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplic
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
@ -51,7 +52,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class SolrCmdDistributor {
public class SolrCmdDistributor implements Closeable {
private static final int MAX_RETRIES_ON_FORWARD = 25;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -96,6 +97,10 @@ public class SolrCmdDistributor {
clients.shutdown();
}
}
public void close() {
clients.shutdown();
}
private void doRetriesIfNeeded() {
// NOTE: retries will be forwards to a single url
@ -210,7 +215,7 @@ public class SolrCmdDistributor {
if (cmd.isInPlaceUpdate()) {
params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
}
submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
submit(new Req(cmd, node, uReq, synchronous, rrt), false);
}
}
@ -314,19 +319,17 @@ public class SolrCmdDistributor {
public boolean synchronous;
public UpdateCommand cmd;
public RequestReplicationTracker rfTracker;
public int pollQueueTime;
public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) {
this(cmd, node, uReq, synchronous, null, 0);
this(cmd, node, uReq, synchronous, null);
}
public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmd = cmd;
this.rfTracker = rfTracker;
this.pollQueueTime = pollQueueTime;
}
public String toString() {

View File

@ -73,9 +73,9 @@ public class StreamingSolrClients {
// on a greater scale since the current behavior is to only increase the number of connections/Runners when
// the queue is more than half full.
client = new ErrorReportingConcurrentUpdateSolrClient(url, httpClient, 100, runnerCount, updateExecutor, true, req);
client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created
client.setParser(new BinaryResponseParser());
client.setRequestWriter(new BinaryRequestWriter());
client.setPollQueueTime(req.pollQueueTime);
Set<String> queryParams = new HashSet<>(2);
queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);

View File

@ -828,6 +828,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// Given that, it may also make sense to move the version reporting out of this
// processor too.
}
@Override
protected void doClose() {
if (cmdDistrib != null) {
cmdDistrib.close();
}
}
// TODO: optionally fail if n replicas are not reached...
private void doFinish() {

View File

@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@ -87,7 +88,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
private boolean internalHttpClient;
private volatile Integer connectionTimeout;
private volatile Integer soTimeout;
private volatile boolean closed;
AtomicInteger pollInterrupts;
AtomicInteger pollExits;
AtomicInteger blockLoops;
AtomicInteger emptyQueueLoops;
/**
* Uses an internally managed HttpClient instance.
*
@ -156,6 +163,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
shutdownExecutor = true;
}
if (log.isDebugEnabled()) {
pollInterrupts = new AtomicInteger();
pollExits = new AtomicInteger();
blockLoops = new AtomicInteger();
emptyQueueLoops = new AtomicInteger();
}
}
public Set<String> getQueryParams() {
@ -174,13 +188,19 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
* Opens a connection and sends everything...
*/
class Runner implements Runnable {
volatile Thread thread = null;
volatile boolean inPoll = false;
public Thread getThread() {
return thread;
}
@Override
public void run() {
this.thread = Thread.currentThread();
log.debug("starting runner: {}", this);
// This loop is so we can continue if an element was added to the queue after the last runner exited.
for (;;) {
try {
sendUpdateStream();
@ -191,7 +211,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
handleError(e);
} finally {
synchronized (runners) {
// check to see if anything else was added to the queue
if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
@ -205,26 +224,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
break;
}
}
}
}
log.debug("finished: {}", this);
}
public void interruptPoll() {
Thread lthread = thread;
if (inPoll && lthread != null) {
lthread.interrupt();
}
}
//
// Pull from the queue multiple times and streams over a single connection.
// Exits on exception, interruption, or an empty queue to pull from.
//
void sendUpdateStream() throws Exception {
while (!queue.isEmpty()) {
HttpPost method = null;
HttpResponse response = null;
InputStream rspBody = null;
try {
final Update update =
queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
Update update;
notifyQueueAndRunnersIfEmptyQueue();
try {
inPoll = true;
update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
continue;
} finally {
inPoll = false;
}
if (update == null)
break;
@ -234,61 +269,73 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
EntityTemplate template = new EntityTemplate(new ContentProducer() {
@Override
public void writeTo(OutputStream out) throws IOException {
try {
if (isXml) {
out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
}
Update upd = update;
while (upd != null) {
UpdateRequest req = upd.getRequest();
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
queue.add(upd); // params are different, push back to queue
break;
}
client.requestWriter.write(req, out);
if (isXml) {
// check for commit or optimize
SolrParams params = req.getParams();
if (params != null) {
String fmt = null;
if (params.getBool(UpdateParams.OPTIMIZE, false)) {
fmt = "<optimize waitSearcher=\"%s\" />";
} else if (params.getBool(UpdateParams.COMMIT, false)) {
fmt = "<commit waitSearcher=\"%s\" />";
}
if (fmt != null) {
byte[] content = String.format(Locale.ROOT,
fmt,
params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ "").getBytes(StandardCharsets.UTF_8);
out.write(content);
}
if (isXml) {
out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
}
Update upd = update;
while (upd != null) {
UpdateRequest req = upd.getRequest();
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
queue.add(upd); // params are different, push back to queue
break;
}
client.requestWriter.write(req, out);
if (isXml) {
// check for commit or optimize
SolrParams params = req.getParams();
if (params != null) {
String fmt = null;
if (params.getBool(UpdateParams.OPTIMIZE, false)) {
fmt = "<optimize waitSearcher=\"%s\" />";
} else if (params.getBool(UpdateParams.COMMIT, false)) {
fmt = "<commit waitSearcher=\"%s\" />";
}
if (fmt != null) {
byte[] content = String.format(Locale.ROOT,
fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ "")
.getBytes(StandardCharsets.UTF_8);
out.write(content);
}
}
out.flush();
}
out.flush();
if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
// no need to wait to see another doc in the queue if we've hit the last doc in a batch
upd = queue.poll(0, TimeUnit.MILLISECONDS);
} else {
upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
notifyQueueAndRunnersIfEmptyQueue();
inPoll = true;
try {
while (true) {
try {
upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException e) {
if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
if (!queue.isEmpty()) {
continue;
}
if (log.isDebugEnabled()) pollExits.incrementAndGet();
upd = null;
break;
} finally {
inPoll = false;
}
}
}finally {
inPoll = false;
}
if (isXml) {
out.write("</stream>".getBytes(StandardCharsets.UTF_8));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("", e);
}
if (isXml) {
out.write("</stream>".getBytes(StandardCharsets.UTF_8));
}
}
});
@ -318,10 +365,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
method.setEntity(template);
method.addHeader("User-Agent", HttpSolrClient.AGENT);
method.addHeader("Content-Type", contentType);
response = client.getHttpClient()
.execute(method, HttpClientUtil.createNewHttpClientRequestContext());
rspBody = response.getEntity().getContent();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
@ -364,6 +414,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} else {
onSuccess(response);
}
} finally {
try {
if (response != null) {
@ -372,10 +423,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} catch (Exception e) {
log.error("Error consuming and closing http response stream.", e);
}
notifyQueueAndRunnersIfEmptyQueue();
}
}
}
}
private void notifyQueueAndRunnersIfEmptyQueue() {
if (queue.size() == 0) {
synchronized (queue) {
// queue may be empty
queue.notifyAll();
}
synchronized (runners) {
// we notify runners too - if there is a high queue poll time and this is the update
// that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
runners.notifyAll();
}
}
}
// *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
private void addRunner() {
@ -383,7 +449,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
try {
Runner r = new Runner();
runners.add(r);
scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine.
} finally {
MDC.remove("ConcurrentUpdateSolrClient.url");
}
@ -517,29 +585,52 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
public synchronized void blockUntilFinished() {
lock = new CountDownLatch(1);
try {
waitForEmptyQueue();
interruptRunnerThreadsPolling();
synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
// which means it would never remove itself from the runners list. This is why we don't wait forever
// which means it would never remove itself from the runners list. This is why we don't wait forever
// and periodically check if the scheduler is shutting down.
int loopCount = 0;
while (!runners.isEmpty()) {
try {
runners.wait(250);
} catch (InterruptedException e) {
Thread.interrupted();
}
if (log.isDebugEnabled()) blockLoops.incrementAndGet();
if (scheduler.isShutdown())
break;
loopCount++;
// Need to check if the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size();
if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen?
log.warn("No more runners, but queue still has "+
queueSize+" adding more runners to process remaining requests on queue");
log.warn("No more runners, but queue still has " +
queueSize + " adding more runners to process remaining requests on queue");
addRunner();
}
interruptRunnerThreadsPolling();
// try to avoid the worst case wait timeout
// without bad spin
int timeout;
if (loopCount < 3) {
timeout = 10;
} else if (loopCount < 10) {
timeout = 25;
} else {
timeout = 250;
}
try {
runners.wait(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
} finally {
@ -548,6 +639,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
}
private void waitForEmptyQueue() {
while (!queue.isEmpty()) {
if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
synchronized (runners) {
int queueSize = queue.size();
if (queueSize > 0 && runners.isEmpty()) {
log.warn("No more runners, but queue still has " +
queueSize + " adding more runners to process remaining requests on queue");
addRunner();
}
}
synchronized (queue) {
try {
queue.wait(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public void handleError(Throwable ex) {
log.error("error", ex);
}
@ -560,19 +674,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
@Override
public void close() {
if (internalHttpClient) IOUtils.closeQuietly(client);
if (shutdownExecutor) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
public synchronized void close() {
if (closed) {
interruptRunnerThreadsPolling();
return;
}
closed = true;
try {
if (shutdownExecutor) {
scheduler.shutdown();
interruptRunnerThreadsPolling();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
.error("ExecutorService did not terminate");
}
} catch (InterruptedException ie) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
.error("ExecutorService did not terminate");
Thread.currentThread().interrupt();
}
} catch (InterruptedException ie) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
} else {
interruptRunnerThreadsPolling();
}
} finally {
if (internalHttpClient) IOUtils.closeQuietly(client);
if (log.isDebugEnabled()) {
log.debug("STATS pollInteruppts={} pollExists={} blockLoops={} emptyQueueLoops={}", pollInterrupts.get(), pollExits.get(), blockLoops.get(), emptyQueueLoops.get());
}
}
}
private void interruptRunnerThreadsPolling() {
synchronized (runners) {
for (Runner runner : runners) {
runner.interruptPoll();
}
}
}
@ -590,17 +727,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
public void shutdownNow() {
if (internalHttpClient) IOUtils.closeQuietly(client);
if (shutdownExecutor) {
scheduler.shutdownNow(); // Cancel currently executing tasks
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
log.error("ExecutorService did not terminate");
} catch (InterruptedException ie) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
if (closed) {
return;
}
closed = true;
try {
if (shutdownExecutor) {
scheduler.shutdown();
interruptRunnerThreadsPolling();
scheduler.shutdownNow(); // Cancel currently executing tasks
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
log.error("ExecutorService did not terminate");
} catch (InterruptedException ie) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
} else {
interruptRunnerThreadsPolling();
}
}
} finally {
if (internalHttpClient) IOUtils.closeQuietly(client);
}
}
public void setParser(ResponseParser responseParser) {