mirror of https://github.com/apache/lucene.git
SOLR-3658: SolrCmdDistributor can briefly create spikes of threads in the thousands - second pass
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1368725 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c112d15e7
commit
4e099f3571
|
@ -24,7 +24,6 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
|
@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -48,6 +48,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
|
import org.apache.solr.util.AdjustableSemaphore;
|
||||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -62,11 +63,12 @@ public class SolrCmdDistributor {
|
||||||
static BoundedExecutor commExecutor;
|
static BoundedExecutor commExecutor;
|
||||||
|
|
||||||
static final HttpClient client;
|
static final HttpClient client;
|
||||||
|
static AdjustableSemaphore semaphore;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 200);
|
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 500);
|
||||||
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 8);
|
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 16);
|
||||||
client = HttpClientUtil.createClient(params);
|
client = HttpClientUtil.createClient(params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,14 +94,22 @@ public class SolrCmdDistributor {
|
||||||
}
|
}
|
||||||
|
|
||||||
public SolrCmdDistributor(int numHosts) {
|
public SolrCmdDistributor(int numHosts) {
|
||||||
|
int maxPoolSize = Math.max(8, (numHosts-1) * 8);
|
||||||
BoundedExecutor executor = null;
|
BoundedExecutor executor = null;
|
||||||
synchronized (SolrCmdDistributor.class) {
|
synchronized (SolrCmdDistributor.class) {
|
||||||
if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) {
|
if (semaphore == null) {
|
||||||
|
semaphore = new AdjustableSemaphore(maxPoolSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxPoolSize != semaphore.getMaxPermits()) {
|
||||||
|
// raise the permits to match maxPoolSize
|
||||||
|
semaphore.setMaxPermits(maxPoolSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commExecutor == null || commExecutor.getMaximumPoolSize() != maxPoolSize) {
|
||||||
// we don't shutdown the previous because all it's threads will die
|
// we don't shutdown the previous because all it's threads will die
|
||||||
int maxPoolSize = Math.max(8, (numHosts-1) * 8);
|
|
||||||
commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
|
commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
|
||||||
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize * 2),
|
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
|
||||||
new DefaultSolrThreadFactory("cmdDistribExecutor"));
|
new DefaultSolrThreadFactory("cmdDistribExecutor"));
|
||||||
}
|
}
|
||||||
executor = commExecutor;
|
executor = commExecutor;
|
||||||
|
@ -343,11 +353,17 @@ public class SolrCmdDistributor {
|
||||||
} else {
|
} else {
|
||||||
clonedRequest.rspCode = -1;
|
clonedRequest.rspCode = -1;
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
semaphore.release();
|
||||||
}
|
}
|
||||||
return clonedRequest;
|
return clonedRequest;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
try {
|
||||||
|
semaphore.acquire();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException();
|
||||||
|
}
|
||||||
pending.add(completionService.submit(task));
|
pending.add(completionService.submit(task));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -517,33 +533,30 @@ public class SolrCmdDistributor {
|
||||||
public class BoundedExecutor extends ThreadPoolExecutor {
|
public class BoundedExecutor extends ThreadPoolExecutor {
|
||||||
private final Semaphore semaphore;
|
private final Semaphore semaphore;
|
||||||
|
|
||||||
public BoundedExecutor(int corePoolSize,
|
public BoundedExecutor(int corePoolSize, int maximumPoolSize,
|
||||||
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
|
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
|
||||||
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
|
ThreadFactory threadFactory) {
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
super(corePoolSize, Integer.MAX_VALUE, keepAliveTime, unit, workQueue,
|
||||||
|
threadFactory);
|
||||||
this.semaphore = new Semaphore(maximumPoolSize);
|
this.semaphore = new Semaphore(maximumPoolSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(final Runnable command) {
|
public void execute(final Runnable command) {
|
||||||
|
// try {
|
||||||
|
// System.out.println("semaphore aq:" + semaphore.availablePermits());
|
||||||
|
// semaphore.acquire();
|
||||||
|
// System.out.println("aquired");
|
||||||
|
// } catch (InterruptedException e1) {
|
||||||
|
// throw new RuntimeException();
|
||||||
|
// }
|
||||||
try {
|
try {
|
||||||
semaphore.acquire();
|
super.execute(command);
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
throw new RuntimeException();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
super.execute(new Runnable() {
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
command.run();
|
|
||||||
} finally {
|
|
||||||
semaphore.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
semaphore.release();
|
|
||||||
throw e;
|
throw e;
|
||||||
|
} finally {
|
||||||
|
// semaphore.release();
|
||||||
|
// System.out.println("semaphore re:" + semaphore.availablePermits());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,10 +158,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||||
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
|
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
|
||||||
|
|
||||||
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
|
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
|
||||||
|
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
|
||||||
|
if (zkEnabled) {
|
||||||
|
numNodes = zkController.getZkStateReader().getCloudState().getLiveNodes().size();
|
||||||
|
}
|
||||||
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
|
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
|
||||||
|
|
||||||
|
|
||||||
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
|
|
||||||
|
|
||||||
cloudDesc = coreDesc.getCloudDescriptor();
|
cloudDesc = coreDesc.getCloudDescriptor();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
package org.apache.solr.util;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
final public class AdjustableSemaphore {
|
||||||
|
|
||||||
|
private final ResizeableSemaphore semaphore;
|
||||||
|
|
||||||
|
private int maxPermits = 0;
|
||||||
|
|
||||||
|
public AdjustableSemaphore(int size) {
|
||||||
|
semaphore = new ResizeableSemaphore(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setMaxPermits(int newMax) {
|
||||||
|
if (newMax < 1) {
|
||||||
|
throw new IllegalArgumentException("Semaphore size must be at least 1,"
|
||||||
|
+ " was " + newMax);
|
||||||
|
}
|
||||||
|
|
||||||
|
int delta = newMax - this.maxPermits;
|
||||||
|
|
||||||
|
if (delta == 0) {
|
||||||
|
return;
|
||||||
|
} else if (delta > 0) {
|
||||||
|
this.semaphore.release(delta);
|
||||||
|
} else {
|
||||||
|
delta *= -1;
|
||||||
|
this.semaphore.reducePermits(delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.maxPermits = newMax;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void release() {
|
||||||
|
this.semaphore.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void release(int numPermits) {
|
||||||
|
this.semaphore.release(numPermits);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void acquire() throws InterruptedException {
|
||||||
|
this.semaphore.acquire();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxPermits() {
|
||||||
|
return maxPermits;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class ResizeableSemaphore extends Semaphore {
|
||||||
|
|
||||||
|
ResizeableSemaphore(int size) {
|
||||||
|
super(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void reducePermits(int reduction) {
|
||||||
|
super.reducePermits(reduction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue