Pipeline Stats: Fix concurrent modification exception (#18177)
Due to trying to modify a map while iterating it, a concurrent modification in the pipeline stats could be thrown. This uses an iterator to prevent this. Closes #18126
This commit is contained in:
parent
93567a2f1b
commit
b12a42351e
|
@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -124,9 +125,11 @@ public class PipelineExecutionService implements ClusterStateListener {
|
|||
void updatePipelineStats(IngestMetadata ingestMetadata) {
|
||||
boolean changed = false;
|
||||
Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
|
||||
for (String pipeline : newStatsPerPipeline.keySet()) {
|
||||
Iterator<String> iterator = newStatsPerPipeline.keySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String pipeline = iterator.next();
|
||||
if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
|
||||
newStatsPerPipeline.remove(pipeline);
|
||||
iterator.remove();
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,11 +46,13 @@ import java.util.function.BiConsumer;
|
|||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -380,6 +382,22 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(2L));
|
||||
}
|
||||
|
||||
// issue: https://github.com/elastic/elasticsearch/issues/18126
|
||||
public void testUpdatingStatsWhenRemovingPipelineWorks() throws Exception {
|
||||
Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
|
||||
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}")));
|
||||
configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}")));
|
||||
executionService.updatePipelineStats(new IngestMetadata(configurationMap));
|
||||
assertThat(executionService.stats().getStatsPerPipeline(), hasKey("_id1"));
|
||||
assertThat(executionService.stats().getStatsPerPipeline(), hasKey("_id2"));
|
||||
|
||||
configurationMap = new HashMap<>();
|
||||
configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}")));
|
||||
executionService.updatePipelineStats(new IngestMetadata(configurationMap));
|
||||
assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id1")));
|
||||
assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
|
||||
}
|
||||
|
||||
private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {
|
||||
return argThat(new IngestDocumentMatcher(index, type, id, source));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue