diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java index 6f319c67fde..48dd36177ae 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Function; /** * Helper methods for access and storage of an enrich policy. @@ -43,9 +44,11 @@ public final class EnrichStore { } // TODO: add policy validation - final Map policies = getPolicies(clusterService.state()); - policies.put(name, policy); - updateClusterState(policies, clusterService, handler); + updateClusterState(clusterService, handler, current -> { + final Map policies = getPolicies(current); + policies.put(name, policy); + return policies; + }); } /** @@ -62,13 +65,15 @@ public final class EnrichStore { throw new IllegalArgumentException("name is missing or empty"); } - final Map policies = getPolicies(clusterService.state()); - if (policies.containsKey(name) == false) { - throw new ResourceNotFoundException("policy [{}] not found", name); - } + updateClusterState(clusterService, handler, current -> { + final Map policies = getPolicies(current); + if (policies.containsKey(name) == false) { + throw new ResourceNotFoundException("policy [{}] not found", name); + } - policies.remove(name); - updateClusterState(policies, clusterService, handler); + policies.remove(name); + return policies; + }); } /** @@ -103,12 +108,14 @@ public final class EnrichStore { return policies; } - private static void updateClusterState(Map policies, ClusterService clusterService, - Consumer handler) { - clusterService.submitStateUpdateTask("update-enrich-policy", new ClusterStateUpdateTask() { + private static void updateClusterState(ClusterService clusterService, + Consumer handler, + Function> function) { + clusterService.submitStateUpdateTask("update-enrich-metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { + Map policies = function.apply(currentState); MetaData metaData = MetaData.builder(currentState.metaData()) .putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies)) .build(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java index dbf90b1ccaa..05be4ae6ea2 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java @@ -36,8 +36,9 @@ public class EnrichStoreTests extends ESSingleNodeTestCase { assertThat(listPolicies.size(), equalTo(1)); assertThat(listPolicies.get(name), equalTo(policy)); - error = deleteEnrichPolicy(name, clusterService); - assertThat(error.get(), nullValue()); + deleteEnrichPolicy(name, clusterService); + result = EnrichStore.getPolicy(name, clusterService.state()); + assertThat(result, nullValue()); } public void testPutValidation() throws Exception { @@ -110,7 +111,7 @@ public class EnrichStoreTests extends ESSingleNodeTestCase { return error; } - private AtomicReference deleteEnrichPolicy(String name, ClusterService clusterService) throws InterruptedException { + private void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference error = new AtomicReference<>(); EnrichStore.deletePolicy(name, clusterService, e -> { @@ -118,6 +119,8 @@ public class EnrichStoreTests extends ESSingleNodeTestCase { latch.countDown(); }); latch.await(); - return error; + if (error.get() != null){ + throw error.get(); + } } }