Enrich store should only update the policies via an update task. (#41944)

This commit is contained in:
Martijn van Groningen 2019-05-09 18:14:01 +02:00
parent 44f09a9a86
commit 299ff70bfe
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
2 changed files with 26 additions and 16 deletions

View File

@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
/** /**
* Helper methods for access and storage of an enrich policy. * Helper methods for access and storage of an enrich policy.
@ -43,9 +44,11 @@ public final class EnrichStore {
} }
// TODO: add policy validation // TODO: add policy validation
final Map<String, EnrichPolicy> policies = getPolicies(clusterService.state()); updateClusterState(clusterService, handler, current -> {
policies.put(name, policy); final Map<String, EnrichPolicy> policies = getPolicies(current);
updateClusterState(policies, clusterService, handler); policies.put(name, policy);
return policies;
});
} }
/** /**
@ -62,13 +65,15 @@ public final class EnrichStore {
throw new IllegalArgumentException("name is missing or empty"); throw new IllegalArgumentException("name is missing or empty");
} }
final Map<String, EnrichPolicy> policies = getPolicies(clusterService.state()); updateClusterState(clusterService, handler, current -> {
if (policies.containsKey(name) == false) { final Map<String, EnrichPolicy> policies = getPolicies(current);
throw new ResourceNotFoundException("policy [{}] not found", name); if (policies.containsKey(name) == false) {
} throw new ResourceNotFoundException("policy [{}] not found", name);
}
policies.remove(name); policies.remove(name);
updateClusterState(policies, clusterService, handler); return policies;
});
} }
/** /**
@ -103,12 +108,14 @@ public final class EnrichStore {
return policies; return policies;
} }
private static void updateClusterState(Map<String, EnrichPolicy> policies, ClusterService clusterService, private static void updateClusterState(ClusterService clusterService,
Consumer<Exception> handler) { Consumer<Exception> handler,
clusterService.submitStateUpdateTask("update-enrich-policy", new ClusterStateUpdateTask() { Function<ClusterState, Map<String, EnrichPolicy>> function) {
clusterService.submitStateUpdateTask("update-enrich-metadata", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
Map<String, EnrichPolicy> policies = function.apply(currentState);
MetaData metaData = MetaData.builder(currentState.metaData()) MetaData metaData = MetaData.builder(currentState.metaData())
.putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies)) .putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies))
.build(); .build();

View File

@ -36,8 +36,9 @@ public class EnrichStoreTests extends ESSingleNodeTestCase {
assertThat(listPolicies.size(), equalTo(1)); assertThat(listPolicies.size(), equalTo(1));
assertThat(listPolicies.get(name), equalTo(policy)); assertThat(listPolicies.get(name), equalTo(policy));
error = deleteEnrichPolicy(name, clusterService); deleteEnrichPolicy(name, clusterService);
assertThat(error.get(), nullValue()); result = EnrichStore.getPolicy(name, clusterService.state());
assertThat(result, nullValue());
} }
public void testPutValidation() throws Exception { public void testPutValidation() throws Exception {
@ -110,7 +111,7 @@ public class EnrichStoreTests extends ESSingleNodeTestCase {
return error; return error;
} }
private AtomicReference<Exception> deleteEnrichPolicy(String name, ClusterService clusterService) throws InterruptedException { private void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>(); AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.deletePolicy(name, clusterService, e -> { EnrichStore.deletePolicy(name, clusterService, e -> {
@ -118,6 +119,8 @@ public class EnrichStoreTests extends ESSingleNodeTestCase {
latch.countDown(); latch.countDown();
}); });
latch.await(); latch.await();
return error; if (error.get() != null){
throw error.get();
}
} }
} }