Add started boolean and locks to `HistoryStore`

There were a few corner cases where a `HistoryStore.put` operation was underway while we were shutting down since we were not holding any lock during this time.
This change adds a ReadWriteLock and uses the ReadLock for updates and put operations and the WriteLock for shutdown.
Also add a lock around the manual execution of a watch since it could change the status.

Fixes elastic/elasticsearch#204

Original commit: elastic/x-pack-elasticsearch@60ef78eff5
This commit is contained in:
Brian Murphy 2015-04-15 17:14:13 -04:00
parent 7f0e4fab41
commit 3e7b668406
4 changed files with 66 additions and 16 deletions

View File

@ -81,6 +81,7 @@ public class ExecutionService extends AbstractComponent {
}
if (started.compareAndSet(false, true)) {
logger.debug("starting execution service");
historyStore.start();
executeRecords(records);
logger.debug("started execution service");
}
@ -94,8 +95,10 @@ public class ExecutionService extends AbstractComponent {
// this is a forceful shutdown that also interrupts the worker threads in the threadpool
List<Runnable> cancelledTasks = new ArrayList<>();
executor.queue().drainTo(cancelledTasks);
historyStore.stop();
logger.debug("cancelled [{}] queued tasks", cancelledTasks.size());
logger.debug("stopped execution service");
}
}
@ -113,8 +116,13 @@ public class ExecutionService extends AbstractComponent {
public WatchRecord execute(WatchExecutionContext ctx) throws IOException {
WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent());
WatchExecution execution = executeInner(ctx);
watchRecord.seal(execution);
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().name());
try {
WatchExecution execution = executeInner(ctx);
watchRecord.seal(execution);
} finally {
lock.release();
}
if (ctx.recordInHistory()) {
historyStore.put(watchRecord);
}
@ -249,7 +257,9 @@ public class ExecutionService extends AbstractComponent {
@Override
public void run() {
logger.info("Running [{}] [{}]", ctx.watch().name(), ctx.id());
if (!started.get()) {
logger.warn("Rejecting execution due to service is not started");
logger.debug("can't initiate watch execution as execution service is not started, ignoring it...");
return;
}
@ -264,20 +274,21 @@ public class ExecutionService extends AbstractComponent {
}
} catch (Exception e) {
if (started()) {
logger.warn("failed to execute watch [{}]", e, watchRecord.name());
logger.warn("failed to execute watch [{}] [{}]", e, watchRecord.name(), ctx.id());
try {
watchRecord.update(WatchRecord.State.FAILED, e.getMessage());
if (ctx.recordInHistory()) {
historyStore.update(watchRecord);
}
} catch (Exception e2) {
logger.error("failed to update watch record [{}] failure [{}]", e2, watchRecord, e.getMessage());
logger.error("failed to update watch record [{}] failure [{}] for [{}] [{}]", e2, watchRecord, ctx.watch().name(), ctx.id(), e.getMessage());
}
} else {
logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord);
}
} finally {
lock.release();
logger.info("Finished [{}] [{}]", ctx.watch().name(), ctx.id());
}
}
@ -298,7 +309,11 @@ public class ExecutionService extends AbstractComponent {
try {
ExecutionService.this.executeWatch(watch, event);
} catch (Exception e) {
logger.error("failed to execute watch [{}]", e, name);
if (started()) {
logger.error("failed to execute watch from SchedulerListener [{}]", e, name);
} else {
logger.error("failed to execute watch from SchedulerListener [{}] after shutdown", e, name);
}
}
}
}

View File

@ -9,11 +9,10 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
@ -28,9 +27,19 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*/
@ -47,6 +56,11 @@ public class HistoryStore extends AbstractComponent {
private final int scrollSize;
private final TimeValue scrollTimeout;
private final WatchRecord.Parser recordParser;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser) {
@ -58,7 +72,25 @@ public class HistoryStore extends AbstractComponent {
this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
}
public void start() {
started.set(true);
}
public void stop() {
stopLock.lock(); //This will block while put or update actions are underway
try {
started.set(false);
} finally {
stopLock.unlock();
}
}
public void put(WatchRecord watchRecord) throws HistoryException {
putUpdateLock.lock();
if (!started.get()) {
throw new HistoryException("unable to persist watch record history store is not ready");
}
String index = getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
try {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
@ -68,11 +100,16 @@ public class HistoryStore extends AbstractComponent {
watchRecord.version(response.getVersion());
} catch (IOException e) {
throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e);
} finally {
putUpdateLock.unlock();
}
}
public void update(WatchRecord watchRecord) throws HistoryException {
logger.debug("updating watch record [{}]...", watchRecord);
putUpdateLock.lock();
if (!started.get()) {
throw new HistoryException("unable to persist watch record history store is not ready");
}
try {
BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes();
IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()), DOC_TYPE, watchRecord.id().value())
@ -83,6 +120,8 @@ public class HistoryStore extends AbstractComponent {
logger.debug("successfully updated watch record [{}]", watchRecord);
} catch (IOException e) {
throw new HistoryException("failed to update watch record [" + watchRecord + "]", e);
} finally {
putUpdateLock.unlock();
}
}

View File

@ -66,6 +66,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
templateUtils = mock(TemplateUtils.class);
parser = mock(WatchRecord.Parser.class);
historyStore = new HistoryStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser);
historyStore.start();
}
@Test

View File

@ -30,11 +30,6 @@ import static org.hamcrest.Matchers.*;
*/
public class WatchCrudTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return true;
}
@Test @Repeat(iterations = 10)
public void testPut() throws Exception {
ensureWatcherStarted();