diff --git a/server/src/main/java/io/druid/server/QueryManager.java b/server/src/main/java/io/druid/server/QueryManager.java index 5910aeda535..84b947414dd 100644 --- a/server/src/main/java/io/druid/server/QueryManager.java +++ b/server/src/main/java/io/druid/server/QueryManager.java @@ -19,30 +19,50 @@ package io.druid.server; -import com.google.common.collect.Maps; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.query.Query; import io.druid.query.QueryWatcher; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; +import java.util.Set; public class QueryManager implements QueryWatcher { - final ConcurrentMap queries; + final SetMultimap queries; - public QueryManager() { - this.queries = Maps.newConcurrentMap(); - } - - public void cancelQuery(String id) { - Future future = queries.get(id); - if(future != null) { - future.cancel(true); - } - } - public void registerQuery(Query query, ListenableFuture future) + public QueryManager() { - queries.put(query.getId(), future); + this.queries = Multimaps.synchronizedSetMultimap( + HashMultimap.create() + ); + } + + public boolean cancelQuery(String id) { + Set futures = queries.removeAll(id); + boolean success = true; + for (ListenableFuture future : futures) { + success = success && future.cancel(true); + } + return success; + } + + public void registerQuery(Query query, final ListenableFuture future) + { + final String id = query.getId(); + queries.put(id, future); + future.addListener( + new Runnable() + { + @Override + public void run() + { + queries.remove(id, future); + } + }, + MoreExecutors.sameThreadExecutor() + ); } }