Restore thread interrupt flag after an InterruptedException
This commit replaces all occurrences of Thread.interrupted() with Thread.currentThread().interrupt(). While the former checks and clears the current thread's interrupt flag the latter sets it, which is actually intended. Closes #14798
This commit is contained in:
parent
d6a756fbe2
commit
96724e198a
|
@ -103,6 +103,8 @@ public class NodeIndexDeletedAction extends AbstractComponent {
|
|||
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
} catch (LockObtainFailedException exc) {
|
||||
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("[{}] failed to lock all shards for index - interrupted", index);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,9 +35,6 @@ public abstract class AbstractRunnable implements Runnable {
|
|||
public final void run() {
|
||||
try {
|
||||
doRun();
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.interrupted();
|
||||
onFailure(ex);
|
||||
} catch (Throwable t) {
|
||||
onFailure(t);
|
||||
} finally {
|
||||
|
|
|
@ -643,7 +643,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
* @param index the index to process the pending deletes for
|
||||
* @param timeout the timeout used for processing pending deletes
|
||||
*/
|
||||
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException {
|
||||
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException, InterruptedException {
|
||||
logger.debug("{} processing pending deletes", index);
|
||||
final long startTimeNS = System.nanoTime();
|
||||
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
|
||||
|
@ -695,14 +695,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
}
|
||||
if (remove.isEmpty() == false) {
|
||||
logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString());
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
|
||||
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
return;
|
||||
}
|
||||
Thread.sleep(sleepTime);
|
||||
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
|
||||
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
|
||||
}
|
||||
} while ((System.nanoTime() - startTimeNS) < timeout.nanos());
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
|
|||
try {
|
||||
this.purgerThread.shutdown();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
// we intentionally do not want to restore the interruption flag, we're about to shutdown anyway
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -340,7 +340,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
|
|||
try {
|
||||
condition.await(timeout.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
// we intentionally do not want to restore the interruption flag, we're about to shutdown anyway
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.index.shard.ShardPath;
|
|||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
@ -135,7 +134,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||
ensureGreen("test");
|
||||
}
|
||||
|
||||
public void testPendingTasks() throws IOException {
|
||||
public void testPendingTasks() throws Exception {
|
||||
IndicesService indicesService = getIndicesService();
|
||||
IndexService test = createIndex("test");
|
||||
|
||||
|
|
|
@ -63,19 +63,14 @@ public class CompositeTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException {
|
||||
public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException {
|
||||
super.beforeTest(random, transportClientRatio);
|
||||
cluster.beforeTest(random, transportClientRatio);
|
||||
Settings defaultSettings = cluster.getDefaultSettings();
|
||||
final Client client = cluster.size() > 0 ? cluster.client() : cluster.clientNodeClient();
|
||||
for (int i = 0; i < externalNodes.length; i++) {
|
||||
if (!externalNodes[i].running()) {
|
||||
try {
|
||||
externalNodes[i] = externalNodes[i].start(client, defaultSettings, NODE_PREFIX + i, cluster.getClusterName(), i);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
return;
|
||||
}
|
||||
externalNodes[i] = externalNodes[i].start(client, defaultSettings, NODE_PREFIX + i, cluster.getClusterName(), i);
|
||||
}
|
||||
externalNodes[i].reset(random.nextLong());
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ final class ExternalNode implements Closeable {
|
|||
this.random.setSeed(seed);
|
||||
}
|
||||
|
||||
synchronized void stop() {
|
||||
synchronized void stop() throws InterruptedException {
|
||||
if (running()) {
|
||||
try {
|
||||
if (this.client != null) {
|
||||
|
@ -214,11 +214,7 @@ final class ExternalNode implements Closeable {
|
|||
}
|
||||
} finally {
|
||||
process.destroy();
|
||||
try {
|
||||
process.waitFor();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
}
|
||||
process.waitFor();
|
||||
process = null;
|
||||
nodeInfo = null;
|
||||
|
||||
|
@ -233,7 +229,11 @@ final class ExternalNode implements Closeable {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
stop();
|
||||
try {
|
||||
stop();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized String getName() {
|
||||
|
|
|
@ -910,7 +910,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException {
|
||||
public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException {
|
||||
super.beforeTest(random, transportClientRatio);
|
||||
reset(true);
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
|
|||
/**
|
||||
* This method should be executed before each test to reset the cluster to its initial state.
|
||||
*/
|
||||
public void beforeTest(Random random, double transportClientRatio) throws IOException {
|
||||
public void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException {
|
||||
assert transportClientRatio >= 0.0 && transportClientRatio <= 1.0;
|
||||
logger.debug("Reset test cluster with transport client ratio: [{}]", transportClientRatio);
|
||||
this.transportClientRatio = transportClientRatio;
|
||||
|
|
|
@ -94,7 +94,7 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testBeforeTest() throws IOException {
|
||||
public void testBeforeTest() throws Exception {
|
||||
long clusterSeed = randomLong();
|
||||
int minNumDataNodes = randomIntBetween(0, 3);
|
||||
int maxNumDataNodes = randomIntBetween(minNumDataNodes, 4);
|
||||
|
|
Loading…
Reference in New Issue