From ede7cf9eef6518f818dbd477ad167913acc92aa4 Mon Sep 17 00:00:00 2001 From: Peter Cunningham Date: Wed, 9 Aug 2017 18:47:46 +0100 Subject: [PATCH] Added support for where clauses to JDBC lookups. (#4643) * Added support for where clauses to filter lookup values on ingestion. Added a filter field to the JDBC lookups that is used to generate a where clause so that only rows matching the filter value will be brought into Druid. Example being filter="SOMECOLUMN=1" * Required changes based on code review. * Required changes based on code review. * Added support for where clauses to filter lookup values on ingestion. Added a filter field to the JDBC lookups that is used to generate a where clause so that only rows matching the filter value will be brought into Druid. Example being filter="SOMECOLUMN=1" * Updates based on code review, mainly formatting and small refactor of the buildLookupQuery method. * Fixed broken buildLookupQuery method * Removed empty line. * Updates per review comments --- .../extensions-core/lookups-cached-global.md | 11 ++- .../namespace/JdbcExtractionNamespace.java | 84 ++++++++--------- .../lookup/namespace/JdbcCacheGenerator.java | 33 +++++-- .../cache/JdbcExtractionNamespaceTest.java | 91 ++++++++++++++----- 4 files changed, 141 insertions(+), 78 deletions(-) diff --git a/docs/content/development/extensions-core/lookups-cached-global.md b/docs/content/development/extensions-core/lookups-cached-global.md index 385cc07ad7c..17a1b523a2b 100644 --- a/docs/content/development/extensions-core/lookups-cached-global.md +++ b/docs/content/development/extensions-core/lookups-cached-global.md @@ -56,7 +56,8 @@ Globally cached lookups can be specified as part of the [cluster wide config for }, "table": "lookupTable", "keyColumn": "mykeyColumn", - "valueColumn": "MyValueColumn", + "valueColumn": "myValueColumn", + "filter" : "myFilterSQL (Where clause statement e.g LOOKUPTYPE=1)", "tsColumn": "timeColumn" }, "firstCacheTimeout": 120000, @@ -94,9 +95,10 @@ In a simple case where only one [tier](../../querying/lookups.html#dynamic-confi "user": "druid", "password": "diurd" }, - "table": "lookupTable", - "keyColumn": "country_id", - "valueColumn": "country_name", + "table": "lookupValues", + "keyColumn": "value_id", + "valueColumn": "value_text", + "filter": "value_type='country'", "tsColumn": "timeColumn" }, "firstCacheTimeout": 120000, @@ -319,6 +321,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol |`table`|The table which contains the key value pairs|Yes|| |`keyColumn`|The column in `table` which contains the keys|Yes|| |`valueColumn`|The column in `table` which contains the values|Yes|| +|`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter| |`tsColumn`| The column in `table` which contains when the key was updated|No|Not used| |`pollPeriod`|How often to poll the DB|No|0 (only once)| diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JdbcExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JdbcExtractionNamespace.java index 78478b61c52..1a1b77f436f 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JdbcExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JdbcExtractionNamespace.java @@ -23,13 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import io.druid.java.util.common.StringUtils; import io.druid.metadata.MetadataStorageConnectorConfig; import org.joda.time.Period; import javax.annotation.Nullable; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.Objects; /** * @@ -48,22 +48,20 @@ public class JdbcExtractionNamespace implements ExtractionNamespace @JsonProperty private final String tsColumn; @JsonProperty + private final String filter; + @JsonProperty private final Period pollPeriod; @JsonCreator public JdbcExtractionNamespace( @NotNull @JsonProperty(value = "connectorConfig", required = true) final MetadataStorageConnectorConfig connectorConfig, - @NotNull @JsonProperty(value = "table", required = true) - final String table, - @NotNull @JsonProperty(value = "keyColumn", required = true) - final String keyColumn, - @NotNull @JsonProperty(value = "valueColumn", required = true) - final String valueColumn, - @Nullable @JsonProperty(value = "tsColumn", required = false) - final String tsColumn, - @Min(0) @Nullable @JsonProperty(value = "pollPeriod", required = false) - final Period pollPeriod + @NotNull @JsonProperty(value = "table", required = true) final String table, + @NotNull @JsonProperty(value = "keyColumn", required = true) final String keyColumn, + @NotNull @JsonProperty(value = "valueColumn", required = true) final String valueColumn, + @Nullable @JsonProperty(value = "tsColumn", required = false) final String tsColumn, + @Nullable @JsonProperty(value = "filter", required = false) final String filter, + @Min(0) @Nullable @JsonProperty(value = "pollPeriod", required = false) final Period pollPeriod ) { this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig"); @@ -72,6 +70,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn"); this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn"); this.tsColumn = tsColumn; + this.filter = filter; this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod; } @@ -95,6 +94,11 @@ public class JdbcExtractionNamespace implements ExtractionNamespace return valueColumn; } + public String getFilter() + { + return filter; + } + public String getTsColumn() { return tsColumn; @@ -109,15 +113,15 @@ public class JdbcExtractionNamespace implements ExtractionNamespace @Override public String toString() { - return StringUtils.format( - "JdbcExtractionNamespace = { connectorConfig = { %s }, table = %s, keyColumn = %s, valueColumn = %s, tsColumn = %s, pollPeriod = %s}", - connectorConfig.toString(), - table, - keyColumn, - valueColumn, - tsColumn, - pollPeriod - ); + return "JdbcExtractionNamespace{" + + "connectorConfig=" + connectorConfig + + ", table='" + table + '\'' + + ", keyColumn='" + keyColumn + '\'' + + ", valueColumn='" + valueColumn + '\'' + + ", tsColumn='" + tsColumn + '\'' + + ", filter='" + filter + '\'' + + ", pollPeriod=" + pollPeriod + + '}'; } @Override @@ -132,34 +136,26 @@ public class JdbcExtractionNamespace implements ExtractionNamespace JdbcExtractionNamespace that = (JdbcExtractionNamespace) o; - if (!connectorConfig.equals(that.connectorConfig)) { - return false; - } - if (!table.equals(that.table)) { - return false; - } - if (!keyColumn.equals(that.keyColumn)) { - return false; - } - if (!valueColumn.equals(that.valueColumn)) { - return false; - } - if (tsColumn != null ? !tsColumn.equals(that.tsColumn) : that.tsColumn != null) { - return false; - } - return pollPeriod.equals(that.pollPeriod); - + return Objects.equals(connectorConfig, that.connectorConfig) && + Objects.equals(table, that.table) && + Objects.equals(filter, that.filter) && + Objects.equals(keyColumn, that.keyColumn) && + Objects.equals(valueColumn, that.valueColumn) && + Objects.equals(tsColumn, that.tsColumn) && + Objects.equals(pollPeriod, that.pollPeriod); } @Override public int hashCode() { - int result = connectorConfig.hashCode(); - result = 31 * result + table.hashCode(); - result = 31 * result + keyColumn.hashCode(); - result = 31 * result + valueColumn.hashCode(); - result = 31 * result + (tsColumn != null ? tsColumn.hashCode() : 0); - result = 31 * result + pollPeriod.hashCode(); - return result; + return Objects.hash( + connectorConfig, + table, + filter, + keyColumn, + valueColumn, + tsColumn, + pollPeriod + ); } } diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JdbcCacheGenerator.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JdbcCacheGenerator.java index 88924deedab..aa274985474 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JdbcCacheGenerator.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JdbcCacheGenerator.java @@ -33,6 +33,8 @@ import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.util.TimestampMapper; +import com.google.common.base.Strings; + import javax.annotation.Nullable; import java.sql.ResultSet; import java.sql.SQLException; @@ -68,6 +70,7 @@ public final class JdbcCacheGenerator implements CacheGenerator> withHandle(Handle handle) throws Exception { - final String query; - query = StringUtils.format( - "SELECT %s, %s FROM %s", - keyColumn, - valueColumn, - table - ); return handle .createQuery( - query + buildLookupQuery(table, filter, keyColumn, valueColumn) ).map( new ResultSetMapper>() { - @Override public Pair map( final int index, @@ -132,6 +127,26 @@ public final class JdbcCacheGenerator implements CacheGenerator id, JdbcExtractionNamespace namespace) { final CacheScheduler.EntryImpl key = id; diff --git a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java index 6a78c6ebc7f..e983c77b5b7 100644 --- a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/JdbcExtractionNamespaceTest.java @@ -72,13 +72,15 @@ public class JdbcExtractionNamespaceTest private static final String keyName = "keyName"; private static final String valName = "valName"; private static final String tsColumn_ = "tsColumn"; - private static final Map renames = ImmutableMap.of( - "foo", "bar", - "bad", "bar", - "how about that", "foo", - "empty string", "" + private static final String filterColumn = "filterColumn"; + private static final Map renames = ImmutableMap.of( + "foo", new String[]{"bar", "1"}, + "bad", new String[]{"bar", "1"}, + "how about that", new String[]{"foo", "0"}, + "empty string", new String[]{"empty string", "0"} ); + @Parameterized.Parameters(name = "{0}") public static Collection getParameters() { @@ -124,9 +126,10 @@ public class JdbcExtractionNamespaceTest 0, handle.createStatement( StringUtils.format( - "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))", + "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64), %s VARCHAR(64))", tableName, tsColumn_, + filterColumn, keyName, valName ) @@ -174,9 +177,12 @@ public class JdbcExtractionNamespaceTest Assert.assertEquals(0, scheduler.getActiveEntries()); } }); - for (Map.Entry entry : renames.entrySet()) { + for (Map.Entry entry : renames.entrySet()) { try { - insertValues(handle, entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); + String key = entry.getKey(); + String value = entry.getValue()[0]; + String filter = entry.getValue()[1]; + insertValues(handle, key, value, filter, "2015-01-01 00:00:00"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -193,6 +199,7 @@ public class JdbcExtractionNamespaceTest { private final JdbcCacheGenerator delegate = new JdbcCacheGenerator(); + @Override public CacheScheduler.VersionedCache generateCache( final JdbcExtractionNamespace namespace, @@ -322,7 +329,13 @@ public class JdbcExtractionNamespaceTest } } - private void insertValues(final Handle handle, final String key, final String val, final String updateTs) + private void insertValues( + final Handle handle, + final String key, + final String val, + final String filter, + final String updateTs + ) throws InterruptedException { final String query; @@ -331,17 +344,17 @@ public class JdbcExtractionNamespaceTest StringUtils.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key) ).setQueryTimeout(1).execute(); query = StringUtils.format( - "INSERT INTO %s (%s, %s) VALUES ('%s', '%s')", + "INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')", tableName, - keyName, valName, - key, val + filterColumn, keyName, valName, + filter, key, val ); } else { query = StringUtils.format( - "INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')", + "INSERT INTO %s (%s, %s, %s, %s) VALUES ('%s', '%s', '%s', '%s')", tableName, - tsColumn, keyName, valName, - updateTs, key, val + tsColumn, filterColumn, keyName, valName, + updateTs, filter, key, val ); } Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute()); @@ -352,7 +365,7 @@ public class JdbcExtractionNamespaceTest } @Test(timeout = 10_000L) - public void testMapping() + public void testMappingWithoutFilter() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { @@ -362,21 +375,56 @@ public class JdbcExtractionNamespaceTest keyName, valName, tsColumn, + null, new Period(0) ); try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { CacheSchedulerTest.waitFor(entry); final Map map = entry.getCache(); - for (Map.Entry e : renames.entrySet()) { + for (Map.Entry e : renames.entrySet()) { String key = e.getKey(); - String val = e.getValue(); - Assert.assertEquals("non-null check", Strings.emptyToNull(val), Strings.emptyToNull(map.get(key))); + String[] val = e.getValue(); + String field = val[0]; + Assert.assertEquals("non-null check", Strings.emptyToNull(field), Strings.emptyToNull(map.get(key))); } Assert.assertEquals("null check", null, map.get("baz")); } } + @Test(timeout = 20_000L) + public void testMappingWithFilter() + throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException, + InterruptedException, TimeoutException + { + final JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace( + derbyConnectorRule.getMetadataConnectorConfig(), + tableName, + keyName, + valName, + tsColumn, + filterColumn + "='1'", + new Period(0) + ); + try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { + CacheSchedulerTest.waitFor(entry); + final Map map = entry.getCache(); + + for (Map.Entry e : renames.entrySet()) { + String key = e.getKey(); + String[] val = e.getValue(); + String field = val[0]; + String filterVal = val[1]; + + if (filterVal.equals("1")) { + Assert.assertEquals("non-null check", Strings.emptyToNull(field), Strings.emptyToNull(map.get(key))); + } else { + Assert.assertEquals("non-null check", null, Strings.emptyToNull(map.get(key))); + } + } + } + } + @Test(timeout = 10_000L) public void testSkipOld() throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException @@ -384,7 +432,7 @@ public class JdbcExtractionNamespaceTest try (final CacheScheduler.Entry entry = ensureEntry()) { assertUpdated(entry, "foo", "bar"); if (tsColumn != null) { - insertValues(handleRef, "foo", "baz", "1900-01-01 00:00:00"); + insertValues(handleRef, "foo", "baz", null, "1900-01-01 00:00:00"); } assertUpdated(entry, "foo", "bar"); } @@ -396,7 +444,7 @@ public class JdbcExtractionNamespaceTest { try (final CacheScheduler.Entry entry = ensureEntry()) { assertUpdated(entry, "foo", "bar"); - insertValues(handleRef, "foo", "baz", "2900-01-01 00:00:00"); + insertValues(handleRef, "foo", "baz", null, "2900-01-01 00:00:00"); assertUpdated(entry, "foo", "baz"); } } @@ -410,6 +458,7 @@ public class JdbcExtractionNamespaceTest keyName, valName, tsColumn, + null, new Period(10) ); CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);