There is no need to check if the primary shards of the history indices are started, since we don't load watch records any more during the Watcher startup process.
Original commit: elastic/x-pack-elasticsearch@e6bfb37477
This commit is contained in:
parent
3f0509923a
commit
64ee394460
|
@ -94,7 +94,7 @@ public class ExecutionService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean validate(ClusterState state) {
|
public boolean validate(ClusterState state) {
|
||||||
return historyStore.validate(state) && triggeredWatchStore.validate(state);
|
return triggeredWatchStore.validate(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
|
|
@ -7,10 +7,6 @@ package org.elasticsearch.watcher.history;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -40,7 +36,6 @@ public class HistoryStore extends AbstractComponent {
|
||||||
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
|
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
|
||||||
|
|
||||||
private final ClientProxy client;
|
private final ClientProxy client;
|
||||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
|
||||||
|
|
||||||
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||||
private final Lock putUpdateLock = readWriteLock.readLock();
|
private final Lock putUpdateLock = readWriteLock.readLock();
|
||||||
|
@ -48,36 +43,15 @@ public class HistoryStore extends AbstractComponent {
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HistoryStore(Settings settings, ClientProxy client, IndexNameExpressionResolver indexNameExpressionResolver) {
|
public HistoryStore(Settings settings, ClientProxy client) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
started.set(true);
|
started.set(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean validate(ClusterState state) {
|
|
||||||
String[] indices = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
|
|
||||||
if (indices.length == 0) {
|
|
||||||
logger.debug("no history indices exist, so we can load");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String index : indices) {
|
|
||||||
IndexMetaData indexMetaData = state.getMetaData().index(index);
|
|
||||||
if (indexMetaData != null) {
|
|
||||||
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
|
|
||||||
logger.debug("not all primary shards of the [{}] index are started, so we cannot load watcher records", index);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
stopLock.lock(); //This will block while put or update actions are underway
|
stopLock.lock(); //This will block while put or update actions are underway
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -7,7 +7,6 @@ package org.elasticsearch.watcher.history;
|
||||||
|
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -31,13 +30,11 @@ public class HistoryStoreTests extends ESTestCase {
|
||||||
|
|
||||||
private HistoryStore historyStore;
|
private HistoryStore historyStore;
|
||||||
private ClientProxy clientProxy;
|
private ClientProxy clientProxy;
|
||||||
private IndexNameExpressionResolver indexNameExpressionResolver;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() {
|
public void init() {
|
||||||
clientProxy = mock(ClientProxy.class);
|
clientProxy = mock(ClientProxy.class);
|
||||||
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
|
historyStore = new HistoryStore(Settings.EMPTY, clientProxy);
|
||||||
historyStore = new HistoryStore(Settings.EMPTY, clientProxy, indexNameExpressionResolver);
|
|
||||||
historyStore.start();
|
historyStore.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue