Don't rebuild pipeline on every cluster state update

Currently, after at least one pipeline is registered it is getting rebuilt on every single cluster state update, even when this update is not related to ingest metadata. This change adds a check that the ingest metadata changed before trying to rebuild all pipelines.
This commit is contained in:
Igor Motov 2016-08-26 23:09:26 -04:00
parent 1b75cb63a2
commit 3d6270b5cd
2 changed files with 25 additions and 15 deletions

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
@ -62,12 +63,13 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
innerUpdatePipelines(event.state()); innerUpdatePipelines(event.previousState(), event.state());
} }
void innerUpdatePipelines(ClusterState state) { void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
if (ingestMetadata == null) { IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE);
if (Objects.equals(ingestMetadata, previousIngestMetadata)) {
return; return;
} }

View File

@ -98,7 +98,8 @@ public class PipelineStoreTests extends ESTestCase {
public void testUpdatePipelines() { public void testUpdatePipelines() {
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
store.innerUpdatePipelines(clusterState); ClusterState previousClusterState = clusterState;
store.innerUpdatePipelines(previousClusterState, clusterState);
assertThat(store.pipelines.size(), is(0)); assertThat(store.pipelines.size(), is(0));
PipelineConfiguration pipeline = new PipelineConfiguration( PipelineConfiguration pipeline = new PipelineConfiguration(
@ -108,7 +109,7 @@ public class PipelineStoreTests extends ESTestCase {
clusterState = ClusterState.builder(clusterState) clusterState = ClusterState.builder(clusterState)
.metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata))
.build(); .build();
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
assertThat(store.pipelines.size(), is(1)); assertThat(store.pipelines.size(), is(1));
assertThat(store.pipelines.get("_id").getId(), equalTo("_id")); assertThat(store.pipelines.get("_id").getId(), equalTo("_id"));
assertThat(store.pipelines.get("_id").getDescription(), nullValue()); assertThat(store.pipelines.get("_id").getDescription(), nullValue());
@ -124,8 +125,9 @@ public class PipelineStoreTests extends ESTestCase {
// add a new pipeline: // add a new pipeline:
PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}")); PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"));
ClusterState previousClusterState = clusterState;
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
pipeline = store.get(id); pipeline = store.get(id);
assertThat(pipeline, notNullValue()); assertThat(pipeline, notNullValue());
assertThat(pipeline.getId(), equalTo(id)); assertThat(pipeline.getId(), equalTo(id));
@ -134,8 +136,9 @@ public class PipelineStoreTests extends ESTestCase {
// overwrite existing pipeline: // overwrite existing pipeline:
putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}")); putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"));
previousClusterState = clusterState;
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
pipeline = store.get(id); pipeline = store.get(id);
assertThat(pipeline, notNullValue()); assertThat(pipeline, notNullValue());
assertThat(pipeline.getId(), equalTo(id)); assertThat(pipeline.getId(), equalTo(id));
@ -150,9 +153,10 @@ public class PipelineStoreTests extends ESTestCase {
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}")); PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"));
ClusterState previousClusterState = clusterState;
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
try { try {
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
fail("should fail"); fail("should fail");
} catch (ElasticsearchParseException e) { } catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing")); assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@ -166,16 +170,18 @@ public class PipelineStoreTests extends ESTestCase {
"_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}") "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")
); );
IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config));
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
.metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) ClusterState previousClusterState = clusterState;
.build(); clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder()
store.innerUpdatePipelines(clusterState); .putCustom(IngestMetadata.TYPE, ingestMetadata)).build();
store.innerUpdatePipelines(previousClusterState, clusterState);
assertThat(store.get("_id"), notNullValue()); assertThat(store.get("_id"), notNullValue());
// Delete pipeline: // Delete pipeline:
DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id"); DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id");
previousClusterState = clusterState;
clusterState = store.innerDelete(deleteRequest, clusterState); clusterState = store.innerDelete(deleteRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
assertThat(store.get("_id"), nullValue()); assertThat(store.get("_id"), nullValue());
// Delete existing pipeline: // Delete existing pipeline:
@ -236,8 +242,9 @@ public class PipelineStoreTests extends ESTestCase {
PutPipelineRequest putRequest = new PutPipelineRequest(id, PutPipelineRequest putRequest = new PutPipelineRequest(id,
new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"));
ClusterState previousClusterState = clusterState;
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
pipeline = store.get(id); pipeline = store.get(id);
assertThat(pipeline, notNullValue()); assertThat(pipeline, notNullValue());
assertThat(pipeline.getId(), equalTo(id)); assertThat(pipeline.getId(), equalTo(id));
@ -246,8 +253,9 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set")); assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set"));
DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id); DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id);
previousClusterState = clusterState;
clusterState = store.innerDelete(deleteRequest, clusterState); clusterState = store.innerDelete(deleteRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(previousClusterState, clusterState);
pipeline = store.get(id); pipeline = store.get(id);
assertThat(pipeline, nullValue()); assertThat(pipeline, nullValue());
} }