SOLR-7956: Remove a few more interrupt causes.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1700185 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-08-31 02:22:09 +00:00
parent a956d3ffe2
commit 431ed53d46
8 changed files with 73 additions and 71 deletions

View File

@ -16,16 +16,6 @@
*/ */
package org.apache.solr.hadoop; package org.apache.solr.hadoop;
import org.apache.hadoop.fs.FileStatus;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.hadoop.MapReduceIndexerTool.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
@ -35,12 +25,21 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.hadoop.MapReduceIndexerTool.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The optional (parallel) GoLive phase merges the output shards of the previous * The optional (parallel) GoLive phase merges the output shards of the previous
* phase into a set of live customer facing Solr servers, typically a SolrCloud. * phase into a set of live customer facing Solr servers, typically a SolrCloud.
@ -164,7 +163,7 @@ class GoLive {
success = true; success = true;
return true; return true;
} finally { } finally {
shutdownNowAndAwaitTermination(executor); ExecutorUtil.shutdownAndAwaitTermination(executor);
float secs = (System.nanoTime() - start) / (float)(10^9); float secs = (System.nanoTime() - start) / (float)(10^9);
LOG.info("Live merging of index shards into Solr cluster took " + secs + " secs"); LOG.info("Live merging of index shards into Solr cluster took " + secs + " secs");
if (success) { if (success) {
@ -177,25 +176,6 @@ class GoLive {
// if an output dir does not exist, we should fail and do no merge? // if an output dir does not exist, we should fail and do no merge?
} }
private void shutdownNowAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
pool.shutdownNow(); // Cancel currently executing tasks
boolean shutdown = false;
while (!shutdown) {
try {
// Wait a while for existing tasks to terminate
shutdown = pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
if (!shutdown) {
pool.shutdownNow(); // Cancel currently executing tasks
}
}
}
private static final class Request { private static final class Request {
Exception e; Exception e;
boolean success = false; boolean success = false;

View File

@ -102,6 +102,8 @@ class CdcrReplicatorScheduler {
void shutdown() { void shutdown() {
if (isStarted) { if (isStarted) {
// interrupts are often dangerous in Lucene / Solr code, but the
// test for this will leak threads without
replicatorsPool.shutdown(); replicatorsPool.shutdown();
try { try {
replicatorsPool.awaitTermination(60, TimeUnit.SECONDS); replicatorsPool.awaitTermination(60, TimeUnit.SECONDS);

View File

@ -1,5 +1,10 @@
package org.apache.solr.handler; package org.apache.solr.handler;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -35,11 +40,6 @@ import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
* <p> * <p>
* Synchronize periodically the update log of non-leader nodes with their leaders. * Synchronize periodically the update log of non-leader nodes with their leaders.
@ -101,6 +101,8 @@ class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
void shutdown() { void shutdown() {
if (scheduler != null) { if (scheduler != null) {
// interrupts are often dangerous in Lucene / Solr code, but the
// test for this will leak threads without
scheduler.shutdownNow(); scheduler.shutdownNow();
scheduler = null; scheduler = null;
} }

View File

@ -553,8 +553,7 @@ public class IndexFetcher {
markReplicationStop(); markReplicationStop();
dirFileFetcher = null; dirFileFetcher = null;
localFileFetcher = null; localFileFetcher = null;
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdown();
.shutdownNow();
fsyncService = null; fsyncService = null;
stop = false; stop = false;
fsyncException = null; fsyncException = null;

View File

@ -16,6 +16,27 @@ package org.apache.solr.schema;
* limitations under the License. * limitations under the License.
*/ */
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.util.CharFilterFactory; import org.apache.lucene.analysis.util.CharFilterFactory;
@ -53,28 +74,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** Solr-managed schema - non-user-editable, but can be mutable via internal and external REST API requests. */ /** Solr-managed schema - non-user-editable, but can be mutable via internal and external REST API requests. */
public final class ManagedIndexSchema extends IndexSchema { public final class ManagedIndexSchema extends IndexSchema {
@ -269,7 +268,7 @@ public final class ManagedIndexSchema extends IndexSchema {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} finally { } finally {
if (!parallelExecutor.isShutdown()) if (!parallelExecutor.isShutdown())
parallelExecutor.shutdownNow(); parallelExecutor.shutdown();
} }
log.info("Took {}ms for {} replicas to apply schema update version {} for collection {}", log.info("Took {}ms for {} replicas to apply schema update version {} for collection {}",

View File

@ -92,7 +92,7 @@ public final class CommitTracker implements Runnable {
pending.cancel(false); pending.cancel(false);
pending = null; pending = null;
} }
scheduler.shutdownNow(); scheduler.shutdown();
} }
/** schedule individual commits */ /** schedule individual commits */

View File

@ -23,6 +23,7 @@ import org.apache.solr.common.SolrException;
import static org.apache.solr.common.SolrException.ErrorCode.*; import static org.apache.solr.common.SolrException.ErrorCode.*;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
@ -266,11 +267,11 @@ public final class DocExpirationUpdateProcessorFactory
core.addCloseHook(new CloseHook() { core.addCloseHook(new CloseHook() {
public void postClose(SolrCore core) { public void postClose(SolrCore core) {
// update handler is gone, hard terminiate anything that's left. // update handler is gone, terminate anything that's left.
if (executor.isTerminating()) { if (executor.isTerminating()) {
log.info("Triggering hard close of DocExpiration Executor"); log.info("Waiting for close of DocExpiration Executor");
executor.shutdownNow(); ExecutorUtil.shutdownAndAwaitTermination(executor);
} }
} }
public void preClose(SolrCore core) { public void preClose(SolrCore core) {

View File

@ -82,20 +82,39 @@ public class ExecutorUtil {
// this if you know what you are doing - you probably want shutdownAndAwaitTermination. // this if you know what you are doing - you probably want shutdownAndAwaitTermination.
// Marked as Deprecated to discourage use. // Marked as Deprecated to discourage use.
@Deprecated @Deprecated
public static void shutdownNowAndAwaitTermination(ExecutorService pool) { public static void shutdownWithInterruptAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
pool.shutdownNow(); // Cancel currently executing tasks - NOTE: this interrupts! pool.shutdownNow(); // Cancel currently executing tasks - NOTE: this interrupts!
boolean shutdown = false; boolean shutdown = false;
while (!shutdown) { while (!shutdown) {
try { try {
// Wait a while for existing tasks to terminate // Wait a while for existing tasks to terminate
shutdown = pool.awaitTermination(1, TimeUnit.SECONDS); shutdown = pool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Preserve interrupt status // Preserve interrupt status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
if (!shutdown) { }
}
// ** This will interrupt the threads! ** Lucene and Solr do not like this because it can close channels, so only use
// this if you know what you are doing - you probably want shutdownAndAwaitTermination.
// Marked as Deprecated to discourage use.
@Deprecated
public static void shutdownAndAwaitTerminationWithInterrupt(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
boolean shutdown = false;
boolean interrupted = false;
while (!shutdown) {
try {
// Wait a while for existing tasks to terminate
shutdown = pool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
if (!shutdown && !interrupted) {
pool.shutdownNow(); // Cancel currently executing tasks - NOTE: this interrupts! pool.shutdownNow(); // Cancel currently executing tasks - NOTE: this interrupts!
interrupted = true;
} }
} }
} }
@ -106,7 +125,7 @@ public class ExecutorUtil {
while (!shutdown) { while (!shutdown) {
try { try {
// Wait a while for existing tasks to terminate // Wait a while for existing tasks to terminate
shutdown = pool.awaitTermination(1, TimeUnit.SECONDS); shutdown = pool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Preserve interrupt status // Preserve interrupt status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();