SOLR-8995: Use Lamdas for Thread/Runnable

This commit is contained in:
Noble Paul 2016-07-11 19:29:15 +05:30
parent 42e1caf2bf
commit 74de196565
14 changed files with 119 additions and 174 deletions

View File

@ -455,12 +455,7 @@ public class DataImporter {
} }
public void runAsync(final RequestInfo reqParams, final DIHWriter sw) { public void runAsync(final RequestInfo reqParams, final DIHWriter sw) {
new Thread() { new Thread(() -> runCmd(reqParams, sw)).start();
@Override
public void run() {
runCmd(reqParams, sw);
}
}.start();
} }
void runCmd(RequestInfo reqParams, DIHWriter sw) { void runCmd(RequestInfo reqParams, DIHWriter sw) {

View File

@ -261,13 +261,8 @@ public class Overseer implements Closeable {
} }
} finally { } finally {
log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId)); log.info("Overseer Loop exiting : {}", LeaderElector.getNodeName(myId));
new Thread("OverseerExitThread"){ //do this in a separate thread because any wait is interrupted in this main thread
//do this in a separate thread because any wait is interrupted in this main thread new Thread(this::checkIfIamStillLeader, "OverseerExitThread").start();
@Override
public void run() {
checkIfIamStillLeader();
}
}.start();
} }
} }

View File

@ -578,17 +578,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
} }
//if there are too many nodes this command may time out. And most likely dedicated //if there are too many nodes this command may time out. And most likely dedicated
// overseers are created when there are too many nodes . So , do this operation in a separate thread // overseers are created when there are too many nodes . So , do this operation in a separate thread
new Thread(){ new Thread(() -> {
@Override try {
public void run() { overseerPrioritizer.prioritizeOverseerNodes(myId);
try { } catch (Exception e) {
overseerPrioritizer.prioritizeOverseerNodes(myId); log.error("Error in prioritizing Overseer", e);
} catch (Exception e) {
log.error("Error in prioritizing Overseer",e);
}
} }
}.start();
}).start();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -2435,20 +2435,18 @@ public final class ZkController {
final Set<Runnable> listeners = confDirectoryListeners.get(zkDir); final Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
if (listeners != null && !listeners.isEmpty()) { if (listeners != null && !listeners.isEmpty()) {
final Set<Runnable> listenersCopy = new HashSet<>(listeners); final Set<Runnable> listenersCopy = new HashSet<>(listeners);
new Thread() { // run these in a separate thread because this can be long running
// run these in a separate thread because this can be long running new Thread(() -> {
@Override log.info("Running listeners for {}", zkDir);
public void run() { for (final Runnable listener : listenersCopy) {
log.info("Running listeners for {}", zkDir); try {
for (final Runnable listener : listenersCopy) { listener.run();
try { } catch (Exception e) {
listener.run(); log.warn("listener throws error", e);
} catch (Exception e) {
log.warn("listener throws error", e);
}
} }
} }
}.start(); }).start();
} }
} }
return true; return true;

View File

@ -533,24 +533,22 @@ public class CoreContainer {
} finally { } finally {
if (asyncSolrCoreLoad && futures != null) { if (asyncSolrCoreLoad && futures != null) {
Thread shutdownThread = new Thread() {
public void run() { coreContainerWorkExecutor.submit((Runnable) () -> {
try { try {
for (Future<SolrCore> future : futures) { for (Future<SolrCore> future : futures) {
try { try {
future.get(); future.get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (ExecutionException e) { } catch (ExecutionException e) {
log.error("Error waiting for SolrCore to be created", e); log.error("Error waiting for SolrCore to be created", e);
}
} }
} finally {
ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
} }
} finally {
ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
} }
}; });
coreContainerWorkExecutor.submit(shutdownThread);
} else { } else {
ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor); ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
} }

View File

@ -2595,18 +2595,14 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
final String myIndexDir = getIndexDir(); final String myIndexDir = getIndexDir();
final String coreName = getName(); final String coreName = getName();
if (myDirFactory != null && myDataDir != null && myIndexDir != null) { if (myDirFactory != null && myDataDir != null && myIndexDir != null) {
Thread cleanupThread = new Thread() { Thread cleanupThread = new Thread(() -> {
@Override log.info("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir);
public void run() { try {
log.info("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir); myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir);
try { } catch (Exception exc) {
myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir); log.error("Failed to cleanup old index directories for core "+coreName, exc);
} catch (Exception exc) {
log.error("Failed to cleanup old index directories for core "+coreName, exc);
}
} }
}; }, "OldIndexDirectoryCleanupThreadForCore-"+coreName);
cleanupThread.setName("OldIndexDirectoryCleanupThreadForCore-"+coreName);
cleanupThread.setDaemon(true); cleanupThread.setDaemon(true);
cleanupThread.start(); cleanupThread.start();
} }

View File

@ -174,42 +174,38 @@ public class ZkContainer {
} }
public void registerInZk(final SolrCore core, boolean background) { public void registerInZk(final SolrCore core, boolean background) {
Thread thread = new Thread() { Runnable r = () -> {
@Override MDCLoggingContext.setCore(core);
public void run() { try {
MDCLoggingContext.setCore(core);
try { try {
zkController.register(core.getName(), core.getCoreDescriptor());
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
} catch (Exception e) {
try { try {
zkController.register(core.getName(), core.getCoreDescriptor()); zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
} catch (InterruptedException e) { } catch (InterruptedException e1) {
// Restore the interrupted status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
SolrException.log(log, "", e); log.error("", e1);
} catch (Exception e) { } catch (Exception e1) {
try { log.error("", e1);
zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
log.error("", e1);
} catch (Exception e1) {
log.error("", e1);
}
SolrException.log(log, "", e);
} }
} finally { SolrException.log(log, "", e);
MDCLoggingContext.clear();
} }
} finally {
MDCLoggingContext.clear();
} }
}; };
if (zkController != null) { if (zkController != null) {
if (background) { if (background) {
coreZkRegister.execute(thread); coreZkRegister.execute(r);
} else { } else {
MDCLoggingContext.setCore(core); MDCLoggingContext.setCore(core);
try { try {
thread.run(); r.run();
} finally { } finally {
MDCLoggingContext.clear(); MDCLoggingContext.clear();
} }

View File

@ -768,18 +768,15 @@ public class IndexFetcher {
private void reloadCore() { private void reloadCore() {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
new Thread() { new Thread(() -> {
@Override try {
public void run() { solrCore.getCoreDescriptor().getCoreContainer().reload(solrCore.getName());
try { } catch (Exception e) {
solrCore.getCoreDescriptor().getCoreContainer().reload(solrCore.getName()); LOG.error("Could not reload core ", e);
} catch (Exception e) { } finally {
LOG.error("Could not reload core ", e); latch.countDown();
} finally {
latch.countDown();
}
} }
}.start(); }).start();
try { try {
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -275,12 +275,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return; return;
} }
final SolrParams paramsCopy = new ModifiableSolrParams(solrParams); final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);
Thread fetchThread = new Thread("explicit-fetchindex-cmd") { Thread fetchThread = new Thread(() -> doFetch(paramsCopy, false), "explicit-fetchindex-cmd") ;
@Override
public void run() {
doFetch(paramsCopy, false);
}
};
fetchThread.setDaemon(false); fetchThread.setDaemon(false);
fetchThread.start(); fetchThread.start();
if (solrParams.getBool(WAIT, false)) { if (solrParams.getBool(WAIT, false)) {

View File

@ -128,12 +128,7 @@ public class SnapShooter {
} }
protected void deleteSnapAsync(final ReplicationHandler replicationHandler) { protected void deleteSnapAsync(final ReplicationHandler replicationHandler) {
new Thread() { new Thread(() -> deleteNamedSnapshot(replicationHandler)).start();
@Override
public void run() {
deleteNamedSnapshot(replicationHandler);
}
}.start();
} }
public void validateCreateSnapshot() throws IOException { public void validateCreateSnapshot() throws IOException {
@ -170,28 +165,27 @@ public class SnapShooter {
public void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, Consumer<NamedList> result) { public void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, Consumer<NamedList> result) {
solrCore.getDeletionPolicy().saveCommitPoint(indexCommit.getGeneration()); solrCore.getDeletionPolicy().saveCommitPoint(indexCommit.getGeneration());
new Thread() { //TODO should use Solr's ExecutorUtil //TODO should use Solr's ExecutorUtil
@Override new Thread(() -> {
public void run() { try {
result.accept(createSnapshot(indexCommit));
} catch (Exception e) {
LOG.error("Exception while creating snapshot", e);
NamedList snapShootDetails = new NamedList<>();
snapShootDetails.add("snapShootException", e.getMessage());
result.accept(snapShootDetails);
} finally {
solrCore.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
}
if (snapshotName == null) {
try { try {
result.accept(createSnapshot(indexCommit)); deleteOldBackups(numberToKeep);
} catch (Exception e) { } catch (IOException e) {
LOG.error("Exception while creating snapshot", e); LOG.warn("Unable to delete old snapshots ", e);
NamedList snapShootDetails = new NamedList<>();
snapShootDetails.add("snapShootException", e.getMessage());
result.accept(snapShootDetails);
} finally {
solrCore.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
}
if (snapshotName == null) {
try {
deleteOldBackups(numberToKeep);
} catch (IOException e) {
LOG.warn("Unable to delete old snapshots ", e);
}
} }
} }
}.start(); }).start();
} }
// note: remember to reserve the indexCommit first so it won't get deleted concurrently // note: remember to reserve the indexCommit first so it won't get deleted concurrently

View File

@ -206,23 +206,20 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
log.info("I already have the expected version {} of params", expectedVersion); log.info("I already have the expected version {} of params", expectedVersion);
} }
if (checkStale && req.getCore().getResourceLoader() instanceof ZkSolrResourceLoader) { if (checkStale && req.getCore().getResourceLoader() instanceof ZkSolrResourceLoader) {
new Thread(SolrConfigHandler.class.getSimpleName() + "-refreshconf") { new Thread(() -> {
@Override if (!reloadLock.tryLock()) {
public void run() { log.info("Another reload is in progress . Not doing anything");
if (!reloadLock.tryLock()) { return;
log.info("Another reload is in progress . Not doing anything");
return;
}
try {
log.info("Trying to update my configs");
SolrCore.getConfListener(req.getCore(), (ZkSolrResourceLoader) req.getCore().getResourceLoader()).run();
} catch (Exception e) {
log.error("Unable to refresh conf ", e);
} finally {
reloadLock.unlock();
}
} }
}.start(); try {
log.info("Trying to update my configs");
SolrCore.getConfListener(req.getCore(), (ZkSolrResourceLoader) req.getCore().getResourceLoader()).run();
} catch (Exception e) {
log.error("Unable to refresh conf ", e);
} finally {
reloadLock.unlock();
}
}, SolrConfigHandler.class.getSimpleName() + "-refreshconf").start();
} else { } else {
log.info("checkStale {} , resourceloader {}", checkStale, req.getCore().getResourceLoader().getClass().getName()); log.info("checkStale {} , resourceloader {}", checkStale, req.getCore().getResourceLoader().getClass().getName());
} }

View File

@ -582,24 +582,20 @@ enum CoreAdminOperation {
public void call(final CallInfo callInfo) throws IOException { public void call(final CallInfo callInfo) throws IOException {
final SolrParams params = callInfo.req.getParams(); final SolrParams params = callInfo.req.getParams();
log.info("It has been requested that we recover: core="+params.get(CoreAdminParams.CORE)); log.info("It has been requested that we recover: core="+params.get(CoreAdminParams.CORE));
Thread thread = new Thread() { new Thread(() -> {
@Override String cname = params.get(CoreAdminParams.CORE);
public void run() { if (cname == null) {
String cname = params.get(CoreAdminParams.CORE); cname = "";
if (cname == null) { }
cname = ""; try (SolrCore core = callInfo.handler.coreContainer.getCore(cname)) {
} if (core != null) {
try (SolrCore core = callInfo.handler.coreContainer.getCore(cname)) { core.getUpdateHandler().getSolrCoreState().doRecovery(callInfo.handler.coreContainer, core.getCoreDescriptor());
if (core != null) { } else {
core.getUpdateHandler().getSolrCoreState().doRecovery(callInfo.handler.coreContainer, core.getCoreDescriptor()); SolrException.log(log, "Could not find core to call recovery:" + cname);
} else {
SolrException.log(log, "Could not find core to call recovery:" + cname);
}
} }
} }
}; }).start();
thread.start();
} }
}, },
REQUESTSYNCSHARD_OP(REQUESTSYNCSHARD) { REQUESTSYNCSHARD_OP(REQUESTSYNCSHARD) {

View File

@ -16,10 +16,6 @@
*/ */
package org.apache.solr.util; package org.apache.solr.util;
import org.apache.solr.common.util.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -30,6 +26,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.common.util.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A LFU cache implementation based upon ConcurrentHashMap. * A LFU cache implementation based upon ConcurrentHashMap.
* <p> * <p>
@ -139,12 +139,7 @@ public class ConcurrentLFUCache<K, V> implements Cache<K,V> {
// in this method. // in this method.
if (currentSize > upperWaterMark && !isCleaning) { if (currentSize > upperWaterMark && !isCleaning) {
if (newThreadForCleanup) { if (newThreadForCleanup) {
new Thread() { new Thread(this::markAndSweep).start();
@Override
public void run() {
markAndSweep();
}
}.start();
} else if (cleanupThread != null) { } else if (cleanupThread != null) {
cleanupThread.wakeThread(); cleanupThread.wakeThread();
} else { } else {

View File

@ -136,12 +136,7 @@ public class ConcurrentLRUCache<K,V> implements Cache<K,V> {
// in this method. // in this method.
if (currentSize > upperWaterMark && !isCleaning) { if (currentSize > upperWaterMark && !isCleaning) {
if (newThreadForCleanup) { if (newThreadForCleanup) {
new Thread() { new Thread(this::markAndSweep).start();
@Override
public void run() {
markAndSweep();
}
}.start();
} else if (cleanupThread != null){ } else if (cleanupThread != null){
cleanupThread.wakeThread(); cleanupThread.wakeThread();
} else { } else {