mirror of https://github.com/apache/druid.git
make QueryResource return ETag header if If-None-Match header is provided (#3955)
also do not process query and return HTTP 304 NOT MODIFIED if given If-None-Match value matches current ETag
This commit is contained in:
parent
1098ba7a7f
commit
2ead572639
|
@ -21,6 +21,7 @@ package io.druid.client;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
@ -32,6 +33,8 @@ import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.RangeSet;
|
import com.google.common.collect.RangeSet;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.common.hash.Hasher;
|
||||||
|
import com.google.common.hash.Hashing;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
@ -64,12 +67,14 @@ import io.druid.query.SegmentDescriptor;
|
||||||
import io.druid.query.aggregation.MetricManipulatorFns;
|
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||||
import io.druid.query.filter.DimFilterUtils;
|
import io.druid.query.filter.DimFilterUtils;
|
||||||
import io.druid.query.spec.MultipleSpecificSegmentSpec;
|
import io.druid.query.spec.MultipleSpecificSegmentSpec;
|
||||||
|
import io.druid.server.QueryResource;
|
||||||
import io.druid.server.coordination.DruidServerMetadata;
|
import io.druid.server.coordination.DruidServerMetadata;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import io.druid.timeline.TimelineLookup;
|
import io.druid.timeline.TimelineLookup;
|
||||||
import io.druid.timeline.TimelineObjectHolder;
|
import io.druid.timeline.TimelineObjectHolder;
|
||||||
import io.druid.timeline.partition.PartitionChunk;
|
import io.druid.timeline.partition.PartitionChunk;
|
||||||
import io.druid.timeline.partition.ShardSpec;
|
import io.druid.timeline.partition.ShardSpec;
|
||||||
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -258,6 +263,30 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
queryCacheKey = null;
|
queryCacheKey = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (query.getContext().get(QueryResource.HDR_IF_NONE_MATCH) != null) {
|
||||||
|
String prevEtag = (String) query.getContext().get(QueryResource.HDR_IF_NONE_MATCH);
|
||||||
|
|
||||||
|
//compute current Etag
|
||||||
|
Hasher hasher = Hashing.sha1().newHasher();
|
||||||
|
boolean hasOnlyHistoricalSegments = true;
|
||||||
|
for (Pair<ServerSelector, SegmentDescriptor> p : segments) {
|
||||||
|
if (!p.lhs.pick().getServer().isAssignable()) {
|
||||||
|
hasOnlyHistoricalSegments = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
hasher.putString(p.lhs.getSegment().getIdentifier(), Charsets.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasOnlyHistoricalSegments) {
|
||||||
|
hasher.putBytes(queryCacheKey == null ? strategy.computeCacheKey(query) : queryCacheKey);
|
||||||
|
String currEtag = Base64.encodeBase64String(hasher.hash().asBytes());
|
||||||
|
responseContext.put(QueryResource.HDR_ETAG, currEtag);
|
||||||
|
if (prevEtag.equals(currEtag)) {
|
||||||
|
return Sequences.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (queryCacheKey != null) {
|
if (queryCacheKey != null) {
|
||||||
// cachKeys map must preserve segment ordering, in order for shards to always be combined in the same order
|
// cachKeys map must preserve segment ordering, in order for shards to always be combined in the same order
|
||||||
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
|
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
|
||||||
|
|
|
@ -87,6 +87,8 @@ public class QueryResource implements QueryCountStatsProvider
|
||||||
|
|
||||||
protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;
|
protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;
|
||||||
|
|
||||||
|
public static final String HDR_IF_NONE_MATCH = "If-None-Match";
|
||||||
|
public static final String HDR_ETAG = "ETag";
|
||||||
|
|
||||||
protected final QueryToolChestWarehouse warehouse;
|
protected final QueryToolChestWarehouse warehouse;
|
||||||
protected final ServerConfig config;
|
protected final ServerConfig config;
|
||||||
|
@ -217,8 +219,20 @@ public class QueryResource implements QueryCountStatsProvider
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String prevEtag = req.getHeader(HDR_IF_NONE_MATCH);
|
||||||
|
if (prevEtag != null) {
|
||||||
|
query = query.withOverriddenContext(
|
||||||
|
ImmutableMap.of (HDR_IF_NONE_MATCH, prevEtag)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
final Map<String, Object> responseContext = new MapMaker().makeMap();
|
final Map<String, Object> responseContext = new MapMaker().makeMap();
|
||||||
final Sequence res = query.run(texasRanger, responseContext);
|
final Sequence res = query.run(texasRanger, responseContext);
|
||||||
|
|
||||||
|
if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) {
|
||||||
|
return Response.notModified().build();
|
||||||
|
}
|
||||||
|
|
||||||
final Sequence results;
|
final Sequence results;
|
||||||
if (res == null) {
|
if (res == null) {
|
||||||
results = Sequences.empty();
|
results = Sequences.empty();
|
||||||
|
@ -282,6 +296,11 @@ public class QueryResource implements QueryCountStatsProvider
|
||||||
)
|
)
|
||||||
.header("X-Druid-Query-Id", queryId);
|
.header("X-Druid-Query-Id", queryId);
|
||||||
|
|
||||||
|
if (responseContext.get(HDR_ETAG) != null) {
|
||||||
|
builder.header(HDR_ETAG, responseContext.get(HDR_ETAG));
|
||||||
|
responseContext.remove(HDR_ETAG);
|
||||||
|
}
|
||||||
|
|
||||||
//Limit the response-context header, see https://github.com/druid-io/druid/issues/2331
|
//Limit the response-context header, see https://github.com/druid-io/druid/issues/2331
|
||||||
//Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
|
//Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
|
||||||
//and encodes the string using ASCII, so 1 char is = 1 byte
|
//and encodes the string using ASCII, so 1 char is = 1 byte
|
||||||
|
|
|
@ -3144,4 +3144,42 @@ public class CachingClusteredClientTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIfNoneMatch() throws Exception
|
||||||
|
{
|
||||||
|
Interval interval = new Interval("2016/2017");
|
||||||
|
final DataSegment dataSegment = new DataSegment(
|
||||||
|
"dataSource",
|
||||||
|
interval,
|
||||||
|
"ver",
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"type", "hdfs",
|
||||||
|
"path", "/tmp"
|
||||||
|
),
|
||||||
|
ImmutableList.of("product"),
|
||||||
|
ImmutableList.of("visited_sum"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
9,
|
||||||
|
12334
|
||||||
|
);
|
||||||
|
final ServerSelector selector = new ServerSelector(
|
||||||
|
dataSegment,
|
||||||
|
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
|
||||||
|
);
|
||||||
|
selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment);
|
||||||
|
timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector));
|
||||||
|
|
||||||
|
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
|
||||||
|
.dataSource(DATA_SOURCE)
|
||||||
|
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval)))
|
||||||
|
.context(ImmutableMap.<String, Object>of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0="))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
Map<String, String> responseContext = new HashMap<>();
|
||||||
|
|
||||||
|
client.run(query, responseContext);
|
||||||
|
Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=", responseContext.get("ETag"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,6 +127,7 @@ public class QueryResourceTest
|
||||||
public void setup()
|
public void setup()
|
||||||
{
|
{
|
||||||
EasyMock.expect(testServletRequest.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes();
|
EasyMock.expect(testServletRequest.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes();
|
||||||
|
EasyMock.expect(testServletRequest.getHeader(QueryResource.HDR_IF_NONE_MATCH)).andReturn(null).anyTimes();
|
||||||
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
|
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
|
||||||
queryManager = new QueryManager();
|
queryManager = new QueryManager();
|
||||||
queryResource = new QueryResource(
|
queryResource = new QueryResource(
|
||||||
|
|
Loading…
Reference in New Issue