Lazily build Filter in FilteredAggregatorFactory to avoid parsing exceptions in Router (#15526)

Query with lookups in FilteredAggregator fails with this exception in router,

Cannot construct instance of `org.apache.druid.query.aggregation.FilteredAggregatorFactory`, problem: Lookup [campaigns_lookup[campaignId][is_sold][autodsp]] not found at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 913] (through reference chain: org.apache.druid.query.groupby.GroupByQuery["aggregations"]->java.util.ArrayList[1])
T
he problem is that constructor of FilteredAggregatorFactory is actually validating if the lookup exists in this statement dimFilter.toFilter().
This is failing on the router, which is to be expected, because, the router isn’t assigned any lookups.
The fix is to move to a lazy initialisation of the filter object in the constructor.
This commit is contained in:
Rishabh Singh 2023-12-09 12:18:37 +05:30 committed by GitHub
parent e7c8f2e208
commit 54df235026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 11 deletions

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.query.PerSegmentQueryOptimizationContext;
@ -49,7 +51,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
{
private final AggregatorFactory delegate;
private final DimFilter dimFilter;
private final Filter filter;
private final Supplier<Filter> filterSupplier;
@Nullable
private final String name;
@ -75,14 +77,14 @@ public class FilteredAggregatorFactory extends AggregatorFactory
this.delegate = delegate;
this.dimFilter = dimFilter;
this.filter = dimFilter.toFilter();
this.filterSupplier = Suppliers.memoize(dimFilter::toFilter);
this.name = name;
}
@Override
public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory)
{
final ValueMatcher valueMatcher = filter.makeMatcher(columnSelectorFactory);
final ValueMatcher valueMatcher = filterSupplier.get().makeMatcher(columnSelectorFactory);
return new FilteredAggregator(
valueMatcher,
delegate.factorize(columnSelectorFactory)
@ -92,7 +94,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory)
{
final ValueMatcher valueMatcher = filter.makeMatcher(columnSelectorFactory);
final ValueMatcher valueMatcher = filterSupplier.get().makeMatcher(columnSelectorFactory);
return new FilteredBufferAggregator(
valueMatcher,
delegate.factorizeBuffered(columnSelectorFactory)
@ -103,7 +105,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot vectorize");
final VectorValueMatcher valueMatcher = filter.makeVectorMatcher(columnSelectorFactory);
final VectorValueMatcher valueMatcher = filterSupplier.get().makeVectorMatcher(columnSelectorFactory);
return new FilteredVectorAggregator(
valueMatcher,
delegate.factorizeVector(columnSelectorFactory)
@ -113,7 +115,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return delegate.canVectorize(columnInspector) && filter.canVectorizeMatcher(columnInspector);
return delegate.canVectorize(columnInspector) && filterSupplier.get().canVectorizeMatcher(columnInspector);
}
@Override
@ -176,7 +178,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
{
return ImmutableList.copyOf(
// use a set to get rid of dupes
ImmutableSet.<String>builder().addAll(delegate.requiredFields()).addAll(filter.getRequiredColumns()).build()
ImmutableSet.<String>builder().addAll(delegate.requiredFields()).addAll(filterSupplier.get().getRequiredColumns()).build()
);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -54,6 +55,13 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.initialization.BaseJettyTest;
@ -105,9 +113,12 @@ import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
@ -557,9 +568,66 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
verifyServletCallsForQuery(query, true, false, hostFinder, properties, true);
}
/**
* Verifies that the Servlet calls the right methods the right number of times.
*/
@Test
public void testNoParseExceptionOnGroupByWithFilteredAggregationOnLookups() throws Exception
{
class TestLookupReferenceManager implements LookupExtractorFactoryContainerProvider
{
@Override
public Set<String> getAllLookupNames()
{
return null;
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
}
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.intervals("2000/P1D")
.aggregators(
Collections.singletonList(
new FilteredAggregatorFactory(
new StringAnyAggregatorFactory("stringAny", "col", 1024, true),
new SelectorDimFilter(
"test",
"1",
new RegisteredLookupExtractionFn(
new TestLookupReferenceManager(),
"somelookup",
false,
null,
null,
false
)
),
"agg"
)))
.granularity(Granularities.ALL)
.context(ImmutableMap.of("queryId", "dummy"))
.build();
final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
EasyMock.replay(hostFinder);
final ObjectMapper jsonMapper =
TestHelper.makeJsonMapper()
.registerModules(new LookupModule().getJacksonModules())
.setInjectableValues(
new InjectableValues.Std().addValue(
LookupExtractorFactoryContainerProvider.class,
new TestLookupReferenceManager()
)
);
verifyServletCallsForQuery(query, false, false, hostFinder, new Properties(), false, jsonMapper);
}
private void verifyServletCallsForQuery(
Object query,
boolean isNativeSql,
@ -569,7 +637,22 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
boolean isFailure
) throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
verifyServletCallsForQuery(query, isNativeSql, isJDBCSql, hostFinder, properties, isFailure, TestHelper.makeJsonMapper());
}
/**
* Verifies that the Servlet calls the right methods the right number of times.
*/
private void verifyServletCallsForQuery(
Object query,
boolean isNativeSql,
boolean isJDBCSql,
QueryHostFinder hostFinder,
Properties properties,
boolean isFailure,
ObjectMapper jsonMapper
) throws Exception
{
final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
final ServletInputStream servletInputStream = new ServletInputStream()
{