Bring transport actions in line with master

ES master added IndexNameExpressionResolver class, which is required by all transport actions. This change adds this class to the actions as well as replacing calls to state.metadata.concreteIndices with calls to the new IndexNameExpressionResolver class.

Original commit: elastic/x-pack-elasticsearch@7938330321
This commit is contained in:
Brian Murphy 2015-07-10 12:20:34 -04:00
parent 4b246371b4
commit 4cf56aef12
11 changed files with 52 additions and 25 deletions

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -39,6 +40,7 @@ public class HistoryStore extends AbstractComponent {
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final ClientProxy client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock();
@ -46,9 +48,10 @@ public class HistoryStore extends AbstractComponent {
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public HistoryStore(Settings settings, ClientProxy client) {
public HistoryStore(Settings settings, ClientProxy client, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
public void start() {
@ -56,7 +59,7 @@ public class HistoryStore extends AbstractComponent {
}
public boolean validate(ClusterState state) {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
String[] indices = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.debug("no history indices exist, so we can load");
return true;

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.core.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
@ -24,8 +25,10 @@ public abstract class WatcherTransportAction<Request extends MasterNodeRequest<R
private final LicenseService licenseService;
public WatcherTransportAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, LicenseService licenseService, Class<Request> request) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, request);
public WatcherTransportAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, LicenseService licenseService, Class<Request> request) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request);
this.licenseService = licenseService;
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -31,8 +32,9 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
@Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, AckWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, AckWatchRequest.class);
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherService watcherService, LicenseService licenseService) {
super(settings, AckWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, AckWatchRequest.class);
this.watcherService = watcherService;
}

View File

@ -13,13 +13,14 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
/**
@ -31,8 +32,9 @@ public class TransportDeleteWatchAction extends WatcherTransportAction<DeleteWat
@Inject
public TransportDeleteWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, DeleteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, DeleteWatchRequest.class);
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherService watcherService, LicenseService licenseService) {
super(settings, DeleteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, DeleteWatchRequest.class);
this.watcherService = watcherService;
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -55,10 +56,10 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
@Inject
public TransportExecuteWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ExecutionService executionService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ExecutionService executionService,
Clock clock, LicenseService licenseService, WatchStore watchStore, TriggerService triggerService,
Watch.Parser watchParser) {
super(settings, ExecuteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, ExecuteWatchRequest.class);
super(settings, ExecuteWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, ExecuteWatchRequest.class);
this.executionService = executionService;
this.watchStore = watchStore;
this.clock = clock;

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -39,8 +40,9 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
@Inject
public TransportGetWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, GetWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, GetWatchRequest.class);
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherService watcherService, LicenseService licenseService) {
super(settings, GetWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, GetWatchRequest.class);
this.watcherService = watcherService;
}

View File

@ -13,13 +13,14 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.watch.WatchStore;
/**
@ -30,8 +31,9 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
@Inject
public TransportPutWatchAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService, LicenseService licenseService) {
super(settings, PutWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, PutWatchRequest.class);
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherService watcherService, LicenseService licenseService) {
super(settings, PutWatchAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, PutWatchRequest.class);
this.watcherService = watcherService;
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -27,8 +28,10 @@ public class TransportWatcherServiceAction extends WatcherTransportAction<Watche
private final WatcherLifeCycleService lifeCycleService;
@Inject
public TransportWatcherServiceAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, WatcherLifeCycleService lifeCycleService, LicenseService licenseService) {
super(settings, WatcherServiceAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, WatcherServiceRequest.class);
public TransportWatcherServiceAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherLifeCycleService lifeCycleService, LicenseService licenseService) {
super(settings, WatcherServiceAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, WatcherServiceRequest.class);
this.lifeCycleService = lifeCycleService;
}

View File

@ -12,16 +12,17 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.WatcherBuild;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.WatcherVersion;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.watcher.WatcherService;
/**
* Performs the stats operation.
@ -33,9 +34,9 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
@Inject
public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, WatcherService watcherService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService,
ExecutionService executionService, LicenseService licenseService) {
super(settings, WatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, licenseService, WatcherStatsRequest.class);
super(settings, WatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, WatcherStatsRequest.class);
this.watcherService = watcherService;
this.executionService = executionService;
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -30,11 +31,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
private HistoryStore historyStore;
private ClientProxy clientProxy;
private IndexNameExpressionResolver indexNameExpressionResolver;
@Before
public void init() {
clientProxy = mock(ClientProxy.class);
historyStore = new HistoryStore(Settings.EMPTY, clientProxy);
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
historyStore = new HistoryStore(Settings.EMPTY, clientProxy, indexNameExpressionResolver);
historyStore.start();
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
@ -313,6 +314,10 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return getInstanceFromMaster(LicenseService.class);
}
protected IndexNameExpressionResolver indexNameExpressionResolver() {
return internalCluster().getInstance(IndexNameExpressionResolver.class);
}
protected void assertValue(XContentSource source, String path, Matcher<?> matcher) {
WatcherTestUtils.assertValue(source, path, matcher);
}
@ -332,7 +337,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
String[] watchHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
String[] watchHistoryIndices = indexNameExpressionResolver().concreteIndices(state, IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
assertThat(watchHistoryIndices, not(emptyArray()));
for (String index : watchHistoryIndices) {
IndexRoutingTable routingTable = state.getRoutingTable().index(index);
@ -394,7 +399,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
public void run() {
// The watch_history index gets created in the background when the first watch is triggered, so we to check first is this index is created and shards are started
ClusterState state = client().admin().cluster().prepareState().get().getState();
String[] watchHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
String[] watchHistoryIndices = indexNameExpressionResolver().concreteIndices(state, IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
assertThat(watchHistoryIndices, not(emptyArray()));
for (String index : watchHistoryIndices) {
IndexRoutingTable routingTable = state.getRoutingTable().index(index);
@ -417,7 +422,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
String[] watchHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
String[] watchHistoryIndices = indexNameExpressionResolver().concreteIndices(state, IndicesOptions.lenientExpandOpen(), HistoryStore.INDEX_PREFIX + "*");
assertThat(watchHistoryIndices, not(emptyArray()));
for (String index : watchHistoryIndices) {
IndexRoutingTable routingTable = state.getRoutingTable().index(index);