Refactor the enrich store to remove it from guice (#41421)

There is no need to create a enrich store component for the transport
layer since the inner components of the store are either present in the
master node calls or via an already injected ClusterService. This commit
cleans up the class, adds the forthcoming delete call and tests the new
code.
This commit is contained in:
Michael Basnight 2019-04-23 13:28:06 -05:00
parent 860e783f14
commit 85c4cc7f4b
3 changed files with 140 additions and 52 deletions

View File

@ -5,22 +5,14 @@
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -32,19 +24,6 @@ public class EnrichPlugin extends Plugin implements IngestPlugin {
return Collections.emptyMap();
}
@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singleton(new EnrichStore(clusterService));
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE,

View File

@ -5,10 +5,12 @@
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.HashMap;
@ -16,15 +18,11 @@ import java.util.Map;
import java.util.function.Consumer;
/**
* A components that provides access and stores an enrich policy.
* Helper methods for access and storage of an enrich policy.
*/
public final class EnrichStore {
private final ClusterService clusterService;
EnrichStore(ClusterService clusterService) {
this.clusterService = clusterService;
}
private EnrichStore() {}
/**
* Adds a new enrich policy or overwrites an existing policy if there is already a policy with the same name.
@ -34,20 +32,73 @@ public final class EnrichStore {
* @param policy The policy to store
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public void putPolicy(String name, EnrichPolicy policy, Consumer<Exception> handler) {
public static void putPolicy(String name, EnrichPolicy policy, ClusterService clusterService, Consumer<Exception> handler) {
assert clusterService.localNode().isMasterNode();
// TODO: add validation
if (Strings.isNullOrEmpty(name)) {
throw new IllegalArgumentException("name is missing or empty");
}
if (policy == null) {
throw new IllegalArgumentException("policy is missing");
}
// TODO: add policy validation
final Map<String, EnrichPolicy> policies = getPolicies(clusterService.state());
policies.put(name, policy);
updateClusterState(policies, clusterService, handler);
}
/**
* Removes an enrich policy from the policies in the cluster state. This method can only be invoked on the
* elected master node.
*
* @param name The unique name of the policy
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public static void deletePolicy(String name, ClusterService clusterService, Consumer<Exception> handler) {
assert clusterService.localNode().isMasterNode();
if (Strings.isNullOrEmpty(name)) {
throw new IllegalArgumentException("name is missing or empty");
}
final Map<String, EnrichPolicy> policies = getPolicies(clusterService.state());
if (policies.containsKey(name) == false) {
throw new ResourceNotFoundException("policy [{}] not found", name);
}
policies.remove(name);
updateClusterState(policies, clusterService, handler);
}
/**
* Gets an enrich policy for the provided name if exists or otherwise returns <code>null</code>.
*
* @param name The name of the policy to fetch
* @return enrich policy if exists or <code>null</code> otherwise
*/
public static EnrichPolicy getPolicy(String name, ClusterState state) {
if (Strings.isNullOrEmpty(name)) {
throw new IllegalArgumentException("name is missing or empty");
}
return getPolicies(state).get(name);
}
private static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
final Map<String, EnrichPolicy> policies;
final EnrichMetadata enrichMetadata = clusterService.state().metaData().custom(EnrichMetadata.TYPE);
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata != null) {
// Make a copy, because policies map inside custom metadata is read only:
policies = new HashMap<>(enrichMetadata.getPolicies());
} else {
policies = new HashMap<>();
}
policies.put(name, policy);
return policies;
}
private static void updateClusterState(Map<String, EnrichPolicy> policies, ClusterService clusterService,
Consumer<Exception> handler) {
clusterService.submitStateUpdateTask("update-enrich-policy", new ClusterStateUpdateTask() {
@Override
@ -71,19 +122,4 @@ public final class EnrichStore {
}
});
}
/**
* Gets an enrich policy for the provided name if exists or otherwise returns <code>null</code>.
*
* @param name The name of the policy to fetch
* @return enrich policy if exists or <code>null</code> otherwise
*/
public EnrichPolicy getPolicy(String name) {
EnrichMetadata enrichMetadata = clusterService.state().metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata == null) {
return null;
}
return enrichMetadata.getPolicies().get(name);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -20,20 +21,92 @@ import static org.hamcrest.Matchers.nullValue;
public class EnrichStoreTests extends ESSingleNodeTestCase {
public void testCrud() throws Exception {
EnrichStore enrichStore = new EnrichStore(getInstanceFromNode(ClusterService.class));
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
EnrichPolicy result = EnrichStore.getPolicy(name, clusterService.state());
assertThat(result, equalTo(policy));
error = deleteEnrichPolicy(name, clusterService);
assertThat(error.get(), nullValue());
}
public void testPutValidation() throws Exception {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
{
String nullOrEmptyName = randomBoolean() ? "" : null;
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> saveEnrichPolicy(nullOrEmptyName, policy, clusterService));
assertThat(error.getMessage(), equalTo("name is missing or empty"));
}
{
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> saveEnrichPolicy("my-policy", null, clusterService));
assertThat(error.getMessage(), equalTo("policy is missing"));
}
}
public void testDeleteValidation() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
{
String nullOrEmptyName = randomBoolean() ? "" : null;
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> deleteEnrichPolicy(nullOrEmptyName, clusterService));
assertThat(error.getMessage(), equalTo("name is missing or empty"));
}
{
ResourceNotFoundException error = expectThrows(ResourceNotFoundException.class,
() -> deleteEnrichPolicy("my-policy", clusterService));
assertThat(error.getMessage(), equalTo("policy [my-policy] not found"));
}
}
public void testGetValidation() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String nullOrEmptyName = randomBoolean() ? "" : null;
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> EnrichStore.getPolicy(nullOrEmptyName, clusterService.state()));
assertThat(error.getMessage(), equalTo("name is missing or empty"));
EnrichPolicy policy = EnrichStore.getPolicy("null-policy", clusterService.state());
assertNull(policy);
}
private AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
enrichStore.putPolicy("my-policy", policy, e -> {
EnrichStore.putPolicy(name, policy, clusterService, e -> {
error.set(e);
latch.countDown();
});
latch.await();
assertThat(error.get(), nullValue());
EnrichPolicy result = enrichStore.getPolicy("my-policy");
assertThat(result, equalTo(policy));
return error;
}
private AtomicReference<Exception> deleteEnrichPolicy(String name, ClusterService clusterService) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.deletePolicy(name, clusterService, e -> {
error.set(e);
latch.countDown();
});
latch.await();
return error;
}
}