Added support for where clauses to JDBC lookups. (#4643)

* Added support for where clauses to filter lookup values on ingestion.

Added a filter field to the JDBC lookups that is used to generate a
where clause so that only rows matching the filter value will be
brought into Druid. Example being filter="SOMECOLUMN=1"

* Required changes based on code review.

* Required changes based on code review.

* Added support for where clauses to filter lookup values on ingestion.

Added a filter field to the JDBC lookups that is used to generate a
where clause so that only rows matching the filter value will be
brought into Druid. Example being filter="SOMECOLUMN=1"

* Updates based on code review, mainly formatting and small refactor of
the buildLookupQuery method.

* Fixed broken buildLookupQuery method

* Removed empty line.

* Updates per review comments
This commit is contained in:
Peter Cunningham 2017-08-09 18:47:46 +01:00 committed by Charles Allen
parent 42569e65e2
commit ede7cf9eef
4 changed files with 141 additions and 78 deletions

View File

@ -56,7 +56,8 @@ Globally cached lookups can be specified as part of the [cluster wide config for
}, },
"table": "lookupTable", "table": "lookupTable",
"keyColumn": "mykeyColumn", "keyColumn": "mykeyColumn",
"valueColumn": "MyValueColumn", "valueColumn": "myValueColumn",
"filter" : "myFilterSQL (Where clause statement e.g LOOKUPTYPE=1)",
"tsColumn": "timeColumn" "tsColumn": "timeColumn"
}, },
"firstCacheTimeout": 120000, "firstCacheTimeout": 120000,
@ -94,9 +95,10 @@ In a simple case where only one [tier](../../querying/lookups.html#dynamic-confi
"user": "druid", "user": "druid",
"password": "diurd" "password": "diurd"
}, },
"table": "lookupTable", "table": "lookupValues",
"keyColumn": "country_id", "keyColumn": "value_id",
"valueColumn": "country_name", "valueColumn": "value_text",
"filter": "value_type='country'",
"tsColumn": "timeColumn" "tsColumn": "timeColumn"
}, },
"firstCacheTimeout": 120000, "firstCacheTimeout": 120000,
@ -319,6 +321,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`table`|The table which contains the key value pairs|Yes|| |`table`|The table which contains the key value pairs|Yes||
|`keyColumn`|The column in `table` which contains the keys|Yes|| |`keyColumn`|The column in `table` which contains the keys|Yes||
|`valueColumn`|The column in `table` which contains the values|Yes|| |`valueColumn`|The column in `table` which contains the values|Yes||
|`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter|
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used| |`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)| |`pollPeriod`|How often to poll the DB|No|0 (only once)|

View File

@ -23,13 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.druid.java.util.common.StringUtils;
import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageConnectorConfig;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.validation.constraints.Min; import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.Objects;
/** /**
* *
@ -48,22 +48,20 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
@JsonProperty @JsonProperty
private final String tsColumn; private final String tsColumn;
@JsonProperty @JsonProperty
private final String filter;
@JsonProperty
private final Period pollPeriod; private final Period pollPeriod;
@JsonCreator @JsonCreator
public JdbcExtractionNamespace( public JdbcExtractionNamespace(
@NotNull @JsonProperty(value = "connectorConfig", required = true) @NotNull @JsonProperty(value = "connectorConfig", required = true)
final MetadataStorageConnectorConfig connectorConfig, final MetadataStorageConnectorConfig connectorConfig,
@NotNull @JsonProperty(value = "table", required = true) @NotNull @JsonProperty(value = "table", required = true) final String table,
final String table, @NotNull @JsonProperty(value = "keyColumn", required = true) final String keyColumn,
@NotNull @JsonProperty(value = "keyColumn", required = true) @NotNull @JsonProperty(value = "valueColumn", required = true) final String valueColumn,
final String keyColumn, @Nullable @JsonProperty(value = "tsColumn", required = false) final String tsColumn,
@NotNull @JsonProperty(value = "valueColumn", required = true) @Nullable @JsonProperty(value = "filter", required = false) final String filter,
final String valueColumn, @Min(0) @Nullable @JsonProperty(value = "pollPeriod", required = false) final Period pollPeriod
@Nullable @JsonProperty(value = "tsColumn", required = false)
final String tsColumn,
@Min(0) @Nullable @JsonProperty(value = "pollPeriod", required = false)
final Period pollPeriod
) )
{ {
this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig"); this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
@ -72,6 +70,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn"); this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn"); this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
this.tsColumn = tsColumn; this.tsColumn = tsColumn;
this.filter = filter;
this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod; this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod;
} }
@ -95,6 +94,11 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
return valueColumn; return valueColumn;
} }
public String getFilter()
{
return filter;
}
public String getTsColumn() public String getTsColumn()
{ {
return tsColumn; return tsColumn;
@ -109,15 +113,15 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
@Override @Override
public String toString() public String toString()
{ {
return StringUtils.format( return "JdbcExtractionNamespace{" +
"JdbcExtractionNamespace = { connectorConfig = { %s }, table = %s, keyColumn = %s, valueColumn = %s, tsColumn = %s, pollPeriod = %s}", "connectorConfig=" + connectorConfig +
connectorConfig.toString(), ", table='" + table + '\'' +
table, ", keyColumn='" + keyColumn + '\'' +
keyColumn, ", valueColumn='" + valueColumn + '\'' +
valueColumn, ", tsColumn='" + tsColumn + '\'' +
tsColumn, ", filter='" + filter + '\'' +
pollPeriod ", pollPeriod=" + pollPeriod +
); '}';
} }
@Override @Override
@ -132,34 +136,26 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
JdbcExtractionNamespace that = (JdbcExtractionNamespace) o; JdbcExtractionNamespace that = (JdbcExtractionNamespace) o;
if (!connectorConfig.equals(that.connectorConfig)) { return Objects.equals(connectorConfig, that.connectorConfig) &&
return false; Objects.equals(table, that.table) &&
} Objects.equals(filter, that.filter) &&
if (!table.equals(that.table)) { Objects.equals(keyColumn, that.keyColumn) &&
return false; Objects.equals(valueColumn, that.valueColumn) &&
} Objects.equals(tsColumn, that.tsColumn) &&
if (!keyColumn.equals(that.keyColumn)) { Objects.equals(pollPeriod, that.pollPeriod);
return false;
}
if (!valueColumn.equals(that.valueColumn)) {
return false;
}
if (tsColumn != null ? !tsColumn.equals(that.tsColumn) : that.tsColumn != null) {
return false;
}
return pollPeriod.equals(that.pollPeriod);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
int result = connectorConfig.hashCode(); return Objects.hash(
result = 31 * result + table.hashCode(); connectorConfig,
result = 31 * result + keyColumn.hashCode(); table,
result = 31 * result + valueColumn.hashCode(); filter,
result = 31 * result + (tsColumn != null ? tsColumn.hashCode() : 0); keyColumn,
result = 31 * result + pollPeriod.hashCode(); valueColumn,
return result; tsColumn,
pollPeriod
);
} }
} }

View File

@ -33,6 +33,8 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.TimestampMapper; import org.skife.jdbi.v2.util.TimestampMapper;
import com.google.common.base.Strings;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
@ -68,6 +70,7 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
final long dbQueryStart = System.currentTimeMillis(); final long dbQueryStart = System.currentTimeMillis();
final DBI dbi = ensureDBI(entryId, namespace); final DBI dbi = ensureDBI(entryId, namespace);
final String table = namespace.getTable(); final String table = namespace.getTable();
final String filter = namespace.getFilter();
final String valueColumn = namespace.getValueColumn(); final String valueColumn = namespace.getValueColumn();
final String keyColumn = namespace.getKeyColumn(); final String keyColumn = namespace.getKeyColumn();
@ -78,20 +81,12 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
@Override @Override
public List<Pair<String, String>> withHandle(Handle handle) throws Exception public List<Pair<String, String>> withHandle(Handle handle) throws Exception
{ {
final String query;
query = StringUtils.format(
"SELECT %s, %s FROM %s",
keyColumn,
valueColumn,
table
);
return handle return handle
.createQuery( .createQuery(
query buildLookupQuery(table, filter, keyColumn, valueColumn)
).map( ).map(
new ResultSetMapper<Pair<String, String>>() new ResultSetMapper<Pair<String, String>>()
{ {
@Override @Override
public Pair<String, String> map( public Pair<String, String> map(
final int index, final int index,
@ -132,6 +127,26 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
} }
} }
private String buildLookupQuery(String table, String filter, String keyColumn, String valueColumn)
{
if (Strings.isNullOrEmpty(filter)) {
return StringUtils.format(
"SELECT %s, %s FROM %s",
keyColumn,
valueColumn,
table
);
}
return StringUtils.format(
"SELECT %s, %s FROM %s WHERE %s",
keyColumn,
valueColumn,
table,
filter
);
}
private DBI ensureDBI(CacheScheduler.EntryImpl<JdbcExtractionNamespace> id, JdbcExtractionNamespace namespace) private DBI ensureDBI(CacheScheduler.EntryImpl<JdbcExtractionNamespace> id, JdbcExtractionNamespace namespace)
{ {
final CacheScheduler.EntryImpl<JdbcExtractionNamespace> key = id; final CacheScheduler.EntryImpl<JdbcExtractionNamespace> key = id;

View File

@ -72,13 +72,15 @@ public class JdbcExtractionNamespaceTest
private static final String keyName = "keyName"; private static final String keyName = "keyName";
private static final String valName = "valName"; private static final String valName = "valName";
private static final String tsColumn_ = "tsColumn"; private static final String tsColumn_ = "tsColumn";
private static final Map<String, String> renames = ImmutableMap.of( private static final String filterColumn = "filterColumn";
"foo", "bar", private static final Map<String, String[]> renames = ImmutableMap.of(
"bad", "bar", "foo", new String[]{"bar", "1"},
"how about that", "foo", "bad", new String[]{"bar", "1"},
"empty string", "" "how about that", new String[]{"foo", "0"},
"empty string", new String[]{"empty string", "0"}
); );
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> getParameters() public static Collection<Object[]> getParameters()
{ {
@ -124,9 +126,10 @@ public class JdbcExtractionNamespaceTest
0, 0,
handle.createStatement( handle.createStatement(
StringUtils.format( StringUtils.format(
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))", "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64), %s VARCHAR(64))",
tableName, tableName,
tsColumn_, tsColumn_,
filterColumn,
keyName, keyName,
valName valName
) )
@ -174,9 +177,12 @@ public class JdbcExtractionNamespaceTest
Assert.assertEquals(0, scheduler.getActiveEntries()); Assert.assertEquals(0, scheduler.getActiveEntries());
} }
}); });
for (Map.Entry<String, String> entry : renames.entrySet()) { for (Map.Entry<String, String[]> entry : renames.entrySet()) {
try { try {
insertValues(handle, entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); String key = entry.getKey();
String value = entry.getValue()[0];
String filter = entry.getValue()[1];
insertValues(handle, key, value, filter, "2015-01-01 00:00:00");
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -193,6 +199,7 @@ public class JdbcExtractionNamespaceTest
{ {
private final JdbcCacheGenerator delegate = private final JdbcCacheGenerator delegate =
new JdbcCacheGenerator(); new JdbcCacheGenerator();
@Override @Override
public CacheScheduler.VersionedCache generateCache( public CacheScheduler.VersionedCache generateCache(
final JdbcExtractionNamespace namespace, final JdbcExtractionNamespace namespace,
@ -322,7 +329,13 @@ public class JdbcExtractionNamespaceTest
} }
} }
private void insertValues(final Handle handle, final String key, final String val, final String updateTs) private void insertValues(
final Handle handle,
final String key,
final String val,
final String filter,
final String updateTs
)
throws InterruptedException throws InterruptedException
{ {
final String query; final String query;
@ -331,17 +344,17 @@ public class JdbcExtractionNamespaceTest
StringUtils.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key) StringUtils.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
).setQueryTimeout(1).execute(); ).setQueryTimeout(1).execute();
query = StringUtils.format( query = StringUtils.format(
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')", "INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')",
tableName, tableName,
keyName, valName, filterColumn, keyName, valName,
key, val filter, key, val
); );
} else { } else {
query = StringUtils.format( query = StringUtils.format(
"INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')", "INSERT INTO %s (%s, %s, %s, %s) VALUES ('%s', '%s', '%s', '%s')",
tableName, tableName,
tsColumn, keyName, valName, tsColumn, filterColumn, keyName, valName,
updateTs, key, val updateTs, filter, key, val
); );
} }
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute()); Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
@ -352,7 +365,7 @@ public class JdbcExtractionNamespaceTest
} }
@Test(timeout = 10_000L) @Test(timeout = 10_000L)
public void testMapping() public void testMappingWithoutFilter()
throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException, throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException,
InterruptedException, TimeoutException InterruptedException, TimeoutException
{ {
@ -362,21 +375,56 @@ public class JdbcExtractionNamespaceTest
keyName, keyName,
valName, valName,
tsColumn, tsColumn,
null,
new Period(0) new Period(0)
); );
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
CacheSchedulerTest.waitFor(entry); CacheSchedulerTest.waitFor(entry);
final Map<String, String> map = entry.getCache(); final Map<String, String> map = entry.getCache();
for (Map.Entry<String, String> e : renames.entrySet()) { for (Map.Entry<String, String[]> e : renames.entrySet()) {
String key = e.getKey(); String key = e.getKey();
String val = e.getValue(); String[] val = e.getValue();
Assert.assertEquals("non-null check", Strings.emptyToNull(val), Strings.emptyToNull(map.get(key))); String field = val[0];
Assert.assertEquals("non-null check", Strings.emptyToNull(field), Strings.emptyToNull(map.get(key)));
} }
Assert.assertEquals("null check", null, map.get("baz")); Assert.assertEquals("null check", null, map.get("baz"));
} }
} }
@Test(timeout = 20_000L)
public void testMappingWithFilter()
throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException,
InterruptedException, TimeoutException
{
final JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace(
derbyConnectorRule.getMetadataConnectorConfig(),
tableName,
keyName,
valName,
tsColumn,
filterColumn + "='1'",
new Period(0)
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
CacheSchedulerTest.waitFor(entry);
final Map<String, String> map = entry.getCache();
for (Map.Entry<String, String[]> e : renames.entrySet()) {
String key = e.getKey();
String[] val = e.getValue();
String field = val[0];
String filterVal = val[1];
if (filterVal.equals("1")) {
Assert.assertEquals("non-null check", Strings.emptyToNull(field), Strings.emptyToNull(map.get(key)));
} else {
Assert.assertEquals("non-null check", null, Strings.emptyToNull(map.get(key)));
}
}
}
}
@Test(timeout = 10_000L) @Test(timeout = 10_000L)
public void testSkipOld() public void testSkipOld()
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
@ -384,7 +432,7 @@ public class JdbcExtractionNamespaceTest
try (final CacheScheduler.Entry entry = ensureEntry()) { try (final CacheScheduler.Entry entry = ensureEntry()) {
assertUpdated(entry, "foo", "bar"); assertUpdated(entry, "foo", "bar");
if (tsColumn != null) { if (tsColumn != null) {
insertValues(handleRef, "foo", "baz", "1900-01-01 00:00:00"); insertValues(handleRef, "foo", "baz", null, "1900-01-01 00:00:00");
} }
assertUpdated(entry, "foo", "bar"); assertUpdated(entry, "foo", "bar");
} }
@ -396,7 +444,7 @@ public class JdbcExtractionNamespaceTest
{ {
try (final CacheScheduler.Entry entry = ensureEntry()) { try (final CacheScheduler.Entry entry = ensureEntry()) {
assertUpdated(entry, "foo", "bar"); assertUpdated(entry, "foo", "bar");
insertValues(handleRef, "foo", "baz", "2900-01-01 00:00:00"); insertValues(handleRef, "foo", "baz", null, "2900-01-01 00:00:00");
assertUpdated(entry, "foo", "baz"); assertUpdated(entry, "foo", "baz");
} }
} }
@ -410,6 +458,7 @@ public class JdbcExtractionNamespaceTest
keyName, keyName,
valName, valName,
tsColumn, tsColumn,
null,
new Period(10) new Period(10)
); );
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace); CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);