Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-02-27 11:47:25 -08:00
commit 6621bcc00e
12 changed files with 113 additions and 30 deletions

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
@ -34,10 +35,10 @@ import java.util.Map;
*/
public abstract class BaseQuery<T> implements Query<T>
{
public static String QUERYID = "queryId";
private final String dataSource;
private final Map<String, String> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
public BaseQuery(
@ -130,4 +131,16 @@ public abstract class BaseQuery<T> implements Query<T>
return overridden;
}
@Override
public String getId()
{
return getContextValue(QUERYID);
}
@Override
public Query withId(String id)
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
}

View File

@ -77,4 +77,8 @@ public interface Query<T>
public Query<T> withOverriddenContext(Map<String, String> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
public Query<T> withId(String id);
public String getId();
}

View File

@ -131,7 +131,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -151,7 +151,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -65,11 +65,13 @@ import java.util.Set;
public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery>
{
private static final byte SEARCH_QUERY = 0x2;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>(){};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){};
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final SearchQueryConfig config;
@Inject
@ -123,7 +125,8 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
.setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override
@ -261,6 +264,11 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
);
}
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private final QueryRunner<Result<SearchResultValue>> runner;
@ -269,7 +277,8 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public SearchThresholdAdjustingQueryRunner(
QueryRunner<Result<SearchResultValue>> runner,
SearchQueryConfig config
) {
)
{
this.runner = runner;
this.config = config;
}
@ -341,9 +350,4 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
);
}
}
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -60,18 +60,15 @@ import java.util.Set;
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{
private static final byte SELECT_QUERY = 0x13;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
};
private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE =
new TypeReference<Result<SelectResultValue>>()
{
};
private final QueryConfig config;
private final ObjectMapper jsonMapper;
@ -130,7 +127,8 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
.setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -119,7 +119,8 @@ public class TimeBoundaryQueryQueryToolChest
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser4(query.getType())
.setUser6("false");
.setUser6("false")
.setUser10(query.getId());
}
@Override

View File

@ -61,14 +61,15 @@ import java.util.Map;
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
{
private static final byte TIMESERIES_QUERY = 0x0;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>(){};
new TypeReference<Object>()
{
};
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE =
new TypeReference<Result<TimeseriesResultValue>>() {};
new TypeReference<Result<TimeseriesResultValue>>()
{
};
private final QueryConfig config;
@Inject
@ -127,7 +128,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -133,7 +133,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -257,7 +257,7 @@ public class RealtimePlumber implements Plumber
}
)
)
),
).withWaitMeasuredFromNow(),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.google.inject.Inject;
import io.druid.guice.annotations.Self;
import io.druid.query.Query;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicLong;
public class QueryIdProvider
{
private final String host;
private final AtomicLong id = new AtomicLong();
@Inject
public QueryIdProvider(@Self DruidNode node)
{
host = node.getHost();
}
public String next(Query query)
{
return String.format(
"%s_%s_%s_%s_%s",
query.getDataSource(),
query.getDuration(),
host,
new DateTime(),
id.incrementAndGet()
);
}
}

View File

@ -57,12 +57,12 @@ public class QueryResource
{
private static final Logger log = new Logger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QuerySegmentWalker texasRanger;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryIdProvider idProvider;
@Inject
public QueryResource(
@ -70,7 +70,8 @@ public class QueryResource
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger
RequestLogger requestLogger,
QueryIdProvider idProvider
)
{
this.jsonMapper = jsonMapper;
@ -78,6 +79,7 @@ public class QueryResource
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.idProvider = idProvider;
}
@POST
@ -88,9 +90,9 @@ public class QueryResource
) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
byte[] requestQuery = null;
String queryID;
final boolean isSmile = "application/smile".equals(req.getContentType());
@ -103,6 +105,10 @@ public class QueryResource
try {
requestQuery = ByteStreams.toByteArray(req.getInputStream());
query = objectMapper.readValue(requestQuery, Query.class);
queryID = query.getId();
if (queryID == null) {
query = query.withId(idProvider.next(query));
}
requestLogger.log(
new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
@ -130,6 +136,7 @@ public class QueryResource
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.setUser10(queryID)
.build("request/time", requestTime)
);
}