mirror of https://github.com/apache/druid.git
properly remove completed queries from query manager
This commit is contained in:
parent
a56a655eae
commit
def62c74f8
|
@ -19,30 +19,50 @@
|
|||
|
||||
package io.druid.server;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Multimaps;
|
||||
import com.google.common.collect.SetMultimap;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryWatcher;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.Set;
|
||||
|
||||
public class QueryManager implements QueryWatcher
|
||||
{
|
||||
final ConcurrentMap<String, ListenableFuture> queries;
|
||||
final SetMultimap<String, ListenableFuture> queries;
|
||||
|
||||
public QueryManager() {
|
||||
this.queries = Maps.newConcurrentMap();
|
||||
}
|
||||
|
||||
public void cancelQuery(String id) {
|
||||
Future future = queries.get(id);
|
||||
if(future != null) {
|
||||
future.cancel(true);
|
||||
}
|
||||
}
|
||||
public void registerQuery(Query query, ListenableFuture future)
|
||||
public QueryManager()
|
||||
{
|
||||
queries.put(query.getId(), future);
|
||||
this.queries = Multimaps.synchronizedSetMultimap(
|
||||
HashMultimap.<String, ListenableFuture>create()
|
||||
);
|
||||
}
|
||||
|
||||
public boolean cancelQuery(String id) {
|
||||
Set<ListenableFuture> futures = queries.removeAll(id);
|
||||
boolean success = true;
|
||||
for (ListenableFuture future : futures) {
|
||||
success = success && future.cancel(true);
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
public void registerQuery(Query query, final ListenableFuture future)
|
||||
{
|
||||
final String id = query.getId();
|
||||
queries.put(id, future);
|
||||
future.addListener(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
queries.remove(id, future);
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue