diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/NoticesQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/NoticesQueue.java new file mode 100644 index 00000000000..49bf71fb255 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/NoticesQueue.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.google.common.base.Preconditions; +import com.google.errorprone.annotations.concurrent.GuardedBy; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.LinkedHashSet; + +/** + * Queue that de-duplicates items on addition using {@link Object#equals}. + */ +public class NoticesQueue +{ + @GuardedBy("this") + private final LinkedHashSet queue = new LinkedHashSet<>(); + + /** + * Adds an item. Throws {@link NullPointerException} if the item is null. + */ + public void add(final T item) + { + Preconditions.checkNotNull(item, "item"); + + synchronized (this) { + queue.add(item); + this.notifyAll(); + } + } + + /** + * Retrieves the head of the queue (eldest item). Returns null if the queue is empty and the timeout has elapsed. + */ + @Nullable + public T poll(final long timeoutMillis) throws InterruptedException + { + synchronized (this) { + final long timeoutAt = System.currentTimeMillis() + timeoutMillis; + + long waitMillis = timeoutMillis; + while (queue.isEmpty() && waitMillis > 0) { + wait(waitMillis); + waitMillis = timeoutAt - System.currentTimeMillis(); + } + + final Iterator it = queue.iterator(); + if (it.hasNext()) { + final T item = it.next(); + it.remove(); + return item; + } else { + return null; + } + } + } + + public int size() + { + synchronized (this) { + return queue.size(); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d96b6b17e99..6f00ef7fbfd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -112,13 +112,11 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.BlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -306,15 +304,6 @@ public abstract class SeekableStreamSupervisor notices = new LinkedBlockingDeque<>(); + private final NoticesQueue notices = new NoticesQueue<>(); private final Object stopLock = new Object(); private final Object stateChangeLock = new Object(); private final ReentrantLock recordSupplierLock = new ReentrantLock(); @@ -1037,17 +1033,11 @@ public abstract class SeekableStreamSupervisor scaleAction) { - return () -> notices.add(new DynamicAllocationTasksNotice(scaleAction)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction)); } private Runnable buildRunTask() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java new file mode 100644 index 00000000000..7cc3c07ed90 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import org.apache.druid.indexing.seekablestream.supervisor.NoticesQueue; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class NoticesQueueTest +{ + @Test + public void testQueue() throws InterruptedException + { + final NoticesQueue queue = new NoticesQueue<>(); + + for (int i = 0; i < 3; i++) { + Assert.assertEquals(0, queue.size()); + queue.add("xyz"); + Assert.assertEquals(1, queue.size()); + + queue.add("xyz"); + Assert.assertEquals(1, queue.size()); + + queue.add("foo"); + Assert.assertEquals(2, queue.size()); + + queue.add("xyz"); + Assert.assertEquals(2, queue.size()); + + queue.add("bar"); + Assert.assertEquals(3, queue.size()); + + Assert.assertEquals("xyz", queue.poll(10)); + Assert.assertEquals("foo", queue.poll(10)); + Assert.assertEquals("bar", queue.poll(10)); + Assert.assertNull(queue.poll(10)); + Assert.assertEquals(0, queue.size()); + } + } + + @Test + public void testQueueConcurrent() throws InterruptedException, ExecutionException + { + final NoticesQueue queue = new NoticesQueue<>(); + final ExecutorService exec = Execs.singleThreaded(getClass().getSimpleName()); + + try { + final Future item = exec.submit(() -> queue.poll(60_000)); + + // Imperfect test: ideally we "add" after "poll", but we can't tell if "poll" has started yet. + // Don't want to add a sleep, to avoid adding additional time to the test case, so we live with the imperfection. + queue.add("xyz"); + Assert.assertEquals("xyz", item.get()); + } + finally { + exec.shutdownNow(); + } + } +}