diff --git a/core/pom.xml b/core/pom.xml index ec5b3d6e858..dc0d6449280 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -341,6 +341,7 @@ src/main/java/org/elasticsearch/http/netty/pipelining/** src/test/java/org/elasticsearch/common/network/InetAddressesTests.java + src/test/java/org/elasticsearch/common/collect/EvictingQueueTests.java diff --git a/core/src/main/java/org/elasticsearch/common/collect/EvictingQueue.java b/core/src/main/java/org/elasticsearch/common/collect/EvictingQueue.java new file mode 100644 index 00000000000..51cc08d0209 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/collect/EvictingQueue.java @@ -0,0 +1,176 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; + +/** + * An {@code EvictingQueue} is a non-blocking queue which is limited to a maximum size; when new elements are added to a + * full queue, elements are evicted from the head of the queue to accommodate the new elements. + * + * @param The type of elements in the queue. + */ +public class EvictingQueue implements Queue { + private final int maximumSize; + private final ArrayDeque queue; + + /** + * Construct a new {@code EvictingQueue} that holds {@code maximumSize} elements. + * + * @param maximumSize The maximum number of elements that the queue can hold + * @throws IllegalArgumentException if {@code maximumSize} is less than zero + */ + public EvictingQueue(int maximumSize) { + if (maximumSize < 0) { + throw new IllegalArgumentException("maximumSize < 0"); + } + this.maximumSize = maximumSize; + this.queue = new ArrayDeque<>(maximumSize); + } + + /** + * @return the number of additional elements that the queue can accommodate before evictions occur + */ + public int remainingCapacity() { + return this.maximumSize - this.size(); + } + + /** + * Add the given element to the queue, possibly forcing an eviction from the head if {@link #remainingCapacity()} is + * zero. + * + * @param t the element to add + * @return true if the element was added (always the case for {@code EvictingQueue} + */ + @Override + public boolean add(T t) { + if (maximumSize == 0) { + return true; + } + if (queue.size() == maximumSize) { + queue.remove(); + } + queue.add(t); + return true; + } + + /** + * @see #add(Object) + */ + @Override + public boolean offer(T t) { + return add(t); + } + + @Override + public T remove() { + return queue.remove(); + } + + + @Override + public T poll() { + return queue.poll(); + } + + @Override + public T element() { + return queue.element(); + } + + @Override + public T peek() { + return queue.peek(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return queue.contains(o); + } + + @Override + public Iterator iterator() { + return queue.iterator(); + } + + @Override + public Object[] toArray() { + return queue.toArray(); + } + + @Override + public T1[] toArray(T1[] a) { + return queue.toArray(a); + } + + @Override + public boolean remove(Object o) { + return queue.remove(o); + } + + @Override + public boolean containsAll(Collection c) { + return queue.containsAll(c); + } + + /** + * Add the given elements to the queue, possibly forcing evictions from the head if {@link #remainingCapacity()} is + * zero or becomes zero during the execution of this method. + * + * @param c the collection of elements to add + * @return true if any elements were added to the queue + */ + @Override + public boolean addAll(Collection c) { + boolean modified = false; + for (T e : c) + if (add(e)) + modified = true; + return modified; + } + + @Override + public boolean removeAll(Collection c) { + return queue.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return queue.retainAll(c); + } + + @Override + public void clear() { + queue.clear(); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index a681bc79c9c..48686b9a6b9 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -19,7 +19,7 @@ package org.elasticsearch.search.aggregations.pipeline.movavg; -import com.google.common.collect.EvictingQueue; +import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.aggregations.Aggregation; @@ -102,7 +102,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { InternalHistogram.Factory factory = histo.getFactory(); List newBuckets = new ArrayList<>(); - EvictingQueue values = EvictingQueue.create(this.window); + EvictingQueue values = new EvictingQueue<>(this.window); long lastValidKey = 0; int lastValidPosition = 0; @@ -202,7 +202,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { private MovAvgModel minimize(List buckets, InternalHistogram histo, MovAvgModel model) { int counter = 0; - EvictingQueue values = EvictingQueue.create(window); + EvictingQueue values = new EvictingQueue<>(this.window); double[] test = new double[window]; ListIterator iter = buckets.listIterator(buckets.size()); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java index bb04502f60c..711ee2299cf 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java @@ -19,7 +19,7 @@ package org.elasticsearch.search.aggregations.pipeline.movavg; -import com.google.common.collect.EvictingQueue; +import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java index d00f064a94e..5df97d336c9 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java @@ -19,8 +19,8 @@ package org.elasticsearch.search.aggregations.pipeline.serialdiff; -import com.google.common.collect.EvictingQueue; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -86,7 +86,7 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator { InternalHistogram.Factory factory = histo.getFactory(); List newBuckets = new ArrayList<>(); - EvictingQueue lagWindow = EvictingQueue.create(lag); + EvictingQueue lagWindow = new EvictingQueue<>(lag); int counter = 0; for (InternalHistogram.Bucket bucket : buckets) { diff --git a/core/src/test/java/org/elasticsearch/common/collect/EvictingQueueTests.java b/core/src/test/java/org/elasticsearch/common/collect/EvictingQueueTests.java new file mode 100644 index 00000000000..de822b8aa83 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/collect/EvictingQueueTests.java @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2012 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.elasticsearch.common.collect; + +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.NoSuchElementException; + +public class EvictingQueueTests extends ESTestCase { + public void testCreateWithNegativeSize() throws Exception { + try { + new EvictingQueue<>(-1); + fail(); + } catch (IllegalArgumentException expected) { + } + } + + public void testCreateWithZeroSize() throws Exception { + EvictingQueue queue = new EvictingQueue<>(0); + assertEquals(0, queue.size()); + + assertTrue(queue.add("hi")); + assertEquals(0, queue.size()); + + assertTrue(queue.offer("hi")); + assertEquals(0, queue.size()); + + assertFalse(queue.remove("hi")); + assertEquals(0, queue.size()); + + try { + queue.element(); + fail(); + } catch (NoSuchElementException expected) {} + + assertNull(queue.peek()); + assertNull(queue.poll()); + try { + queue.remove(); + fail(); + } catch (NoSuchElementException expected) {} + } + + public void testRemainingCapacityMaximumSizeZero() { + EvictingQueue queue = new EvictingQueue<>(0); + assertEquals(0, queue.remainingCapacity()); + } + + public void testRemainingCapacityMaximumSizeOne() { + EvictingQueue queue = new EvictingQueue<>(1); + assertEquals(1, queue.remainingCapacity()); + queue.add("hi"); + assertEquals(0, queue.remainingCapacity()); + } + + public void testRemainingCapacityMaximumSizeThree() { + EvictingQueue queue = new EvictingQueue<>(3); + assertEquals(3, queue.remainingCapacity()); + queue.add("hi"); + assertEquals(2, queue.remainingCapacity()); + queue.add("hi"); + assertEquals(1, queue.remainingCapacity()); + queue.add("hi"); + assertEquals(0, queue.remainingCapacity()); + } + + public void testEvictingAfterOne() throws Exception { + EvictingQueue queue = new EvictingQueue<>(1); + assertEquals(0, queue.size()); + assertEquals(1, queue.remainingCapacity()); + + assertTrue(queue.add("hi")); + assertEquals("hi", queue.element()); + assertEquals("hi", queue.peek()); + assertEquals(1, queue.size()); + assertEquals(0, queue.remainingCapacity()); + + assertTrue(queue.add("there")); + assertEquals("there", queue.element()); + assertEquals("there", queue.peek()); + assertEquals(1, queue.size()); + assertEquals(0, queue.remainingCapacity()); + + assertEquals("there", queue.remove()); + assertEquals(0, queue.size()); + assertEquals(1, queue.remainingCapacity()); + } + + public void testEvictingAfterThree() throws Exception { + EvictingQueue queue = new EvictingQueue<>(3); + assertEquals(0, queue.size()); + assertEquals(3, queue.remainingCapacity()); + + assertTrue(queue.add("one")); + assertTrue(queue.add("two")); + assertTrue(queue.add("three")); + assertEquals("one", queue.element()); + assertEquals("one", queue.peek()); + assertEquals(3, queue.size()); + assertEquals(0, queue.remainingCapacity()); + + assertTrue(queue.add("four")); + assertEquals("two", queue.element()); + assertEquals("two", queue.peek()); + assertEquals(3, queue.size()); + assertEquals(0, queue.remainingCapacity()); + + assertEquals("two", queue.remove()); + assertEquals(2, queue.size()); + assertEquals(1, queue.remainingCapacity()); + } + + public void testAddAll() throws Exception { + EvictingQueue queue = new EvictingQueue<>(3); + assertEquals(0, queue.size()); + assertEquals(3, queue.remainingCapacity()); + + assertTrue(queue.addAll(CollectionUtils.arrayAsArrayList("one", "two", "three"))); + assertEquals("one", queue.element()); + assertEquals("one", queue.peek()); + assertEquals(3, queue.size()); + assertEquals(0, queue.remainingCapacity()); + + assertTrue(queue.addAll(Collections.singletonList("four"))); + assertEquals("two", queue.element()); + assertEquals("two", queue.peek()); + assertEquals(3, queue.size()); + assertEquals(0, queue.remainingCapacity()); + + assertEquals("two", queue.remove()); + assertEquals(2, queue.size()); + assertEquals(1, queue.remainingCapacity()); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java index fe942dc9a52..ac4fcf89aef 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java @@ -19,12 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg; - -import com.google.common.collect.EvictingQueue; - import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket; @@ -42,11 +40,7 @@ import org.junit.Test; import java.util.*; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; -import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.max; -import static org.elasticsearch.search.aggregations.AggregationBuilders.min; -import static org.elasticsearch.search.aggregations.AggregationBuilders.range; +import static org.elasticsearch.search.aggregations.AggregationBuilders.*; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.derivative; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.movingAvg; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; @@ -170,7 +164,7 @@ public class MovAvgIT extends ESIntegTestCase { */ private void setupExpected(MovAvgType type, MetricTarget target, int windowSize) { ArrayList values = new ArrayList<>(numBuckets); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) { double metricValue; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java index 65e44b92a92..11c5e4035d6 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java @@ -19,8 +19,8 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg; -import com.google.common.collect.EvictingQueue; import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.search.aggregations.pipeline.movavg.models.*; import org.elasticsearch.test.ESTestCase; import org.junit.Test; @@ -39,7 +39,7 @@ public class MovAvgUnitTests extends ESTestCase { int numValues = randomIntBetween(1, 100); int windowSize = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); @@ -68,7 +68,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(1, 50); int numPredictions = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -94,7 +94,7 @@ public class MovAvgUnitTests extends ESTestCase { int numValues = randomIntBetween(1, 100); int windowSize = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); @@ -126,7 +126,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(1, 50); int numPredictions = randomIntBetween(1,50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -158,7 +158,7 @@ public class MovAvgUnitTests extends ESTestCase { int numValues = randomIntBetween(1, 100); int windowSize = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); @@ -193,7 +193,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(1, 50); int numPredictions = randomIntBetween(1,50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -227,7 +227,7 @@ public class MovAvgUnitTests extends ESTestCase { int numValues = randomIntBetween(1, 100); int windowSize = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); @@ -276,7 +276,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(1, 50); int numPredictions = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -323,7 +323,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -392,7 +392,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data int numPredictions = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -465,7 +465,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } @@ -533,7 +533,7 @@ public class MovAvgUnitTests extends ESTestCase { int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data int numPredictions = randomIntBetween(1, 50); - EvictingQueue window = EvictingQueue.create(windowSize); + EvictingQueue window = new EvictingQueue<>(windowSize); for (int i = 0; i < windowSize; i++) { window.offer(randomDouble()); } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffIT.java index af686674c75..ccd4dcbc136 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffIT.java @@ -19,10 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline.serialdiff; -import com.google.common.collect.EvictingQueue; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.metrics.ValuesSourceMetricsAggregationBuilder; @@ -160,7 +160,7 @@ public class SerialDiffIT extends ESIntegTestCase { */ private void setupExpected(MetricTarget target) { ArrayList values = new ArrayList<>(numBuckets); - EvictingQueue lagWindow = EvictingQueue.create(lag); + EvictingQueue lagWindow = new EvictingQueue<>(lag); int counter = 0; for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) {