Add timeout when stopping WatchLockService

If there were many watches concurrently executing that take some time to execute it can a long time to shutdown a node. This change introduces a Timeout when stopping the WatchLockService.
Make watcher lock service timeout configurable. The configuration setting is `watcher.stop.timeout` the default is 30s.

Fixes elastic/elasticsearch#216

Original commit: elastic/x-pack-elasticsearch@06bf029a54
This commit is contained in:
Brian Murphy 2015-04-22 17:38:15 -04:00
parent a68d9018f2
commit d319fdef1b
3 changed files with 63 additions and 7 deletions

View File

@ -64,7 +64,11 @@ public class WatcherService extends AbstractComponent {
logger.info("stopping watch service..."); logger.info("stopping watch service...");
triggerService.stop(); triggerService.stop();
executionService.stop(); executionService.stop();
watchLockService.stop(); try {
watchLockService.stop();
} catch (WatchLockService.TimedOutException we) {
logger.warn("error stopping WatchLockService", we);
}
watchStore.stop(); watchStore.stop();
state.set(State.STOPPED); state.set(State.STOPPED);
logger.info("watch service has stopped"); logger.info("watch service has stopped");

View File

@ -6,17 +6,39 @@
package org.elasticsearch.watcher.watch; package org.elasticsearch.watcher.watch;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.concurrent.FairKeyedLock; import org.elasticsearch.watcher.support.concurrent.FairKeyedLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
*/ */
public class WatchLockService { public class WatchLockService extends AbstractComponent {
private final FairKeyedLock<String> watchLocks = new FairKeyedLock<>(); private final FairKeyedLock<String> watchLocks = new FairKeyedLock<>();
private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false);
private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "watcher.stop.timeout";
private final TimeValue maxStopTimeout;
@Inject
public WatchLockService(Settings settings){
super(settings);
maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT);
}
WatchLockService(TimeValue maxStopTimeout){
super(ImmutableSettings.EMPTY);
this.maxStopTimeout = maxStopTimeout;
}
public Lock acquire(String name) { public Lock acquire(String name) {
if (!running.get()) { if (!running.get()) {
@ -33,7 +55,10 @@ public class WatchLockService {
} }
} }
public void stop() { /**
* @throws TimedOutException if we have waited longer than maxStopTimeout
*/
public void stop() throws TimedOutException {
if (running.compareAndSet(true, false)) { if (running.compareAndSet(true, false)) {
// It can happen we have still ongoing operations and we wait those operations to finish to avoid // It can happen we have still ongoing operations and we wait those operations to finish to avoid
// that watch service or any of its components end up in a illegal state after the state as been set to stopped. // that watch service or any of its components end up in a illegal state after the state as been set to stopped.
@ -43,7 +68,12 @@ public class WatchLockService {
// expected watch records are processed. // expected watch records are processed.
// //
// Note: new operations will fail now because the running has been set to false // Note: new operations will fail now because the running has been set to false
long startWait = System.currentTimeMillis();
while (watchLocks.hasLockedKeys()) { while (watchLocks.hasLockedKeys()) {
TimeValue timeWaiting = new TimeValue(System.currentTimeMillis() - startWait);
if (timeWaiting.getSeconds() > maxStopTimeout.getSeconds()) {
throw new TimedOutException("timed out waiting for watches to complete, after waiting for [{}]", timeWaiting);
}
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -71,4 +101,15 @@ public class WatchLockService {
watchLocks.release(name); watchLocks.release(name);
} }
} }
public static class TimedOutException extends WatcherException {
public TimedOutException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
public TimedOutException(String msg, Object... args) {
super(msg, args);
}
}
} }

View File

@ -6,12 +6,14 @@
package org.elasticsearch.watcher.watch; package org.elasticsearch.watcher.watch;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -22,7 +24,7 @@ public class WatchLockServiceTests extends ElasticsearchTestCase {
@Test @Test
public void testLocking_notStarted() { public void testLocking_notStarted() {
WatchLockService lockService = new WatchLockService(); WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
try { try {
lockService.acquire("_name"); lockService.acquire("_name");
fail("exception expected"); fail("exception expected");
@ -33,7 +35,7 @@ public class WatchLockServiceTests extends ElasticsearchTestCase {
@Test @Test
public void testLocking() { public void testLocking() {
WatchLockService lockService = new WatchLockService(); WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start(); lockService.start();
WatchLockService.Lock lock = lockService.acquire("_name"); WatchLockService.Lock lock = lockService.acquire("_name");
assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true)); assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true));
@ -44,7 +46,7 @@ public class WatchLockServiceTests extends ElasticsearchTestCase {
@Test @Test
public void testLocking_alreadyHeld() { public void testLocking_alreadyHeld() {
WatchLockService lockService = new WatchLockService(); WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start(); lockService.start();
WatchLockService.Lock lock1 = lockService.acquire("_name"); WatchLockService.Lock lock1 = lockService.acquire("_name");
try { try {
@ -57,9 +59,18 @@ public class WatchLockServiceTests extends ElasticsearchTestCase {
lockService.stop(); lockService.stop();
} }
@Test(expected = WatchLockService.TimedOutException.class)
public void testLocking_stopTimeout(){
final WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start();
lockService.acquire("_name");
lockService.stop();
fail();
}
@Test @Test
public void testLocking_fair() throws Exception { public void testLocking_fair() throws Exception {
final WatchLockService lockService = new WatchLockService(); final WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start(); lockService.start();
final AtomicInteger value = new AtomicInteger(0); final AtomicInteger value = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>(); List<Thread> threads = new ArrayList<>();