review comments queryID changes, add wait time for realtime node queries

This commit is contained in:
nishantmonu51 2014-02-27 14:39:25 +05:30
parent b480d8f543
commit 0701b1534d
13 changed files with 58 additions and 76 deletions

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
@ -34,10 +35,10 @@ import java.util.Map;
*/ */
public abstract class BaseQuery<T> implements Query<T> public abstract class BaseQuery<T> implements Query<T>
{ {
public static String QUERYID = "queryId";
private final String dataSource; private final String dataSource;
private final Map<String, String> context; private final Map<String, String> context;
private final QuerySegmentSpec querySegmentSpec; private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration; private volatile Duration duration;
public BaseQuery( public BaseQuery(
@ -130,4 +131,16 @@ public abstract class BaseQuery<T> implements Query<T>
return overridden; return overridden;
} }
@Override
public String getId()
{
return getContextValue(QUERYID);
}
@Override
public Query withId(String id)
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
} }

View File

@ -77,4 +77,8 @@ public interface Query<T>
public Query<T> withOverriddenContext(Map<String, String> contextOverride); public Query<T> withOverriddenContext(Map<String, String> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec); public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
public Query<T> withId(String id);
public String getId();
} }

View File

@ -1,37 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.ImmutableMap;
public class QueryHelper
{
public static String QUERYID = "queryId";
public static String getQueryId(Query query)
{
return query.getContextValue(QUERYID);
}
public static Query setQueryId(Query query, String id)
{
return query.withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
}

View File

@ -36,7 +36,6 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
@ -133,7 +132,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())) .setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override

View File

@ -36,7 +36,6 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner; import io.druid.query.ResultMergeQueryRunner;
@ -153,7 +152,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
.setUser5(Joiner.on(",").join(query.getIntervals())) .setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override

View File

@ -40,7 +40,6 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -66,11 +65,13 @@ import java.util.Set;
public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery> public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery>
{ {
private static final byte SEARCH_QUERY = 0x2; private static final byte SEARCH_QUERY = 0x2;
private static final Joiner COMMA_JOIN = Joiner.on(","); private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>(){}; private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>()
{
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){}; };
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final SearchQueryConfig config; private final SearchQueryConfig config;
@Inject @Inject
@ -125,7 +126,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override
@ -263,6 +264,11 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
); );
} }
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>> private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>>
{ {
private final QueryRunner<Result<SearchResultValue>> runner; private final QueryRunner<Result<SearchResultValue>> runner;
@ -271,7 +277,8 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public SearchThresholdAdjustingQueryRunner( public SearchThresholdAdjustingQueryRunner(
QueryRunner<Result<SearchResultValue>> runner, QueryRunner<Result<SearchResultValue>> runner,
SearchQueryConfig config SearchQueryConfig config
) { )
{
this.runner = runner; this.runner = runner;
this.config = config; this.config = config;
} }
@ -343,9 +350,4 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
); );
} }
} }
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
} }

View File

@ -37,7 +37,6 @@ import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryConfig; import io.druid.query.QueryConfig;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -61,18 +60,15 @@ import java.util.Set;
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery> public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{ {
private static final byte SELECT_QUERY = 0x13; private static final byte SELECT_QUERY = 0x13;
private static final Joiner COMMA_JOIN = Joiner.on(","); private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>() new TypeReference<Object>()
{ {
}; };
private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE = private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE =
new TypeReference<Result<SelectResultValue>>() new TypeReference<Result<SelectResultValue>>()
{ {
}; };
private final QueryConfig config; private final QueryConfig config;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@ -132,7 +128,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override

View File

@ -34,7 +34,6 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -121,7 +120,7 @@ public class TimeBoundaryQueryQueryToolChest
.setUser2(query.getDataSource()) .setUser2(query.getDataSource())
.setUser4(query.getType()) .setUser4(query.getType())
.setUser6("false") .setUser6("false")
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override

View File

@ -37,7 +37,6 @@ import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryCacheHelper; import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryConfig; import io.druid.query.QueryConfig;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -62,14 +61,15 @@ import java.util.Map;
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
{ {
private static final byte TIMESERIES_QUERY = 0x0; private static final byte TIMESERIES_QUERY = 0x0;
private static final Joiner COMMA_JOIN = Joiner.on(","); private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>(){}; new TypeReference<Object>()
{
};
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE = private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE =
new TypeReference<Result<TimeseriesResultValue>>() {}; new TypeReference<Result<TimeseriesResultValue>>()
{
};
private final QueryConfig config; private final QueryConfig config;
@Inject @Inject
@ -129,7 +129,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())) .setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override

View File

@ -40,7 +40,6 @@ import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryCacheHelper; import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryHelper;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -135,7 +134,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())) .setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(QueryHelper.getQueryId(query)); .setUser10(query.getId());
} }
@Override @Override

View File

@ -257,7 +257,7 @@ public class RealtimePlumber implements Plumber
} }
) )
) )
), ).withWaitMeasuredFromNow(),
new SpecificSegmentSpec( new SpecificSegmentSpec(
new SegmentDescriptor( new SegmentDescriptor(
holder.getInterval(), holder.getInterval(),

View File

@ -21,6 +21,8 @@ package io.druid.server;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
import io.druid.query.Query;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -35,8 +37,15 @@ public class QueryIdProvider
host = node.getHost(); host = node.getHost();
} }
public String next() public String next(Query query)
{ {
return String.format("%s_%s", host, id.incrementAndGet()); return String.format(
"%s_%s_%s_%s_%s",
query.getDataSource(),
query.getDuration(),
host,
new DateTime(),
id.incrementAndGet()
);
} }
} }

View File

@ -35,7 +35,6 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryHelper;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -106,9 +105,9 @@ public class QueryResource
try { try {
requestQuery = ByteStreams.toByteArray(req.getInputStream()); requestQuery = ByteStreams.toByteArray(req.getInputStream());
query = objectMapper.readValue(requestQuery, Query.class); query = objectMapper.readValue(requestQuery, Query.class);
queryID = QueryHelper.getQueryId(query); queryID = query.getId();
if (queryID == null) { if (queryID == null) {
query = QueryHelper.setQueryId(query, idProvider.next()); query = query.withId(idProvider.next(query));
} }
requestLogger.log( requestLogger.log(