mirror of https://github.com/apache/druid.git
Improve missing JDBC driver error for lookups (#8872)
If the JDBC drivers are missing from the lookup extensions, throw an exception that directs the user how to resolve the issue. This change is a follow up to #8825.
This commit is contained in:
parent
ea8e4066f6
commit
d60978343a
|
@ -33,6 +33,12 @@ This module can be used side to side with other lookup module like the global ca
|
||||||
|
|
||||||
To use this extension please make sure to [include](../../development/extensions.md#loading-extensions) `druid-lookups-cached-single` as an extension.
|
To use this extension please make sure to [include](../../development/extensions.md#loading-extensions) `druid-lookups-cached-single` as an extension.
|
||||||
|
|
||||||
|
> If using JDBC, you will need to add your database's client JAR files to the extension's directory.
|
||||||
|
> For MySQL, you can get it from https://dev.mysql.com/downloads/connector/j/, and for Postgres, from
|
||||||
|
> https://jdbc.postgresql.org/download.html or from `extensions/postgresql-metadata-storage/`.
|
||||||
|
> Copy or symlink the downloaded file to
|
||||||
|
> `extensions/druid-lookups-cached-single` under the distribution root directory.
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
Generally speaking this module can be divided into two main component, namely, the data fetcher layer and caching layer.
|
Generally speaking this module can be divided into two main component, namely, the data fetcher layer and caching layer.
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.druid.server.lookup.namespace;
|
package org.apache.druid.server.lookup.namespace;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.JodaUtils;
|
import org.apache.druid.java.util.common.JodaUtils;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.Pair;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
@ -28,15 +29,10 @@ import org.apache.druid.query.lookup.namespace.CacheGenerator;
|
||||||
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
|
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
|
||||||
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
|
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
|
||||||
import org.skife.jdbi.v2.DBI;
|
import org.skife.jdbi.v2.DBI;
|
||||||
import org.skife.jdbi.v2.Handle;
|
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
|
||||||
import org.skife.jdbi.v2.StatementContext;
|
|
||||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
|
||||||
import org.skife.jdbi.v2.tweak.ResultSetMapper;
|
|
||||||
import org.skife.jdbi.v2.util.TimestampMapper;
|
import org.skife.jdbi.v2.util.TimestampMapper;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.sql.ResultSet;
|
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -62,44 +58,31 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
|
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
|
||||||
final Long lastDBUpdate = lastUpdates(entryId, namespace);
|
final Long lastDBUpdate;
|
||||||
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
|
final List<Pair<String, String>> pairs;
|
||||||
return null;
|
final long dbQueryStart;
|
||||||
}
|
|
||||||
final long dbQueryStart = System.currentTimeMillis();
|
try {
|
||||||
final DBI dbi = ensureDBI(entryId, namespace);
|
lastDBUpdate = lastUpdates(entryId, namespace);
|
||||||
final String table = namespace.getTable();
|
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
|
||||||
final String filter = namespace.getFilter();
|
return null;
|
||||||
final String valueColumn = namespace.getValueColumn();
|
}
|
||||||
final String keyColumn = namespace.getKeyColumn();
|
dbQueryStart = System.currentTimeMillis();
|
||||||
|
|
||||||
|
LOG.debug("Updating %s", entryId);
|
||||||
|
pairs = getLookupPairs(entryId, namespace);
|
||||||
|
}
|
||||||
|
catch (UnableToObtainConnectionException e) {
|
||||||
|
if (e.getMessage().contains("No suitable driver found")) {
|
||||||
|
throw new ISE(
|
||||||
|
e,
|
||||||
|
"JDBC driver JAR files missing from extensions/druid-lookups-cached-global directory"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG.debug("Updating %s", entryId);
|
|
||||||
final List<Pair<String, String>> pairs = dbi.withHandle(
|
|
||||||
new HandleCallback<List<Pair<String, String>>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public List<Pair<String, String>> withHandle(Handle handle)
|
|
||||||
{
|
|
||||||
return handle
|
|
||||||
.createQuery(
|
|
||||||
buildLookupQuery(table, filter, keyColumn, valueColumn)
|
|
||||||
).map(
|
|
||||||
new ResultSetMapper<Pair<String, String>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Pair<String, String> map(
|
|
||||||
final int index,
|
|
||||||
final ResultSet r,
|
|
||||||
final StatementContext ctx
|
|
||||||
) throws SQLException
|
|
||||||
{
|
|
||||||
return new Pair<>(r.getString(keyColumn), r.getString(valueColumn));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
).list();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
final String newVersion;
|
final String newVersion;
|
||||||
if (lastDBUpdate != null) {
|
if (lastDBUpdate != null) {
|
||||||
newVersion = lastDBUpdate.toString();
|
newVersion = lastDBUpdate.toString();
|
||||||
|
@ -126,7 +109,26 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildLookupQuery(String table, String filter, String keyColumn, String valueColumn)
|
private List<Pair<String, String>> getLookupPairs(
|
||||||
|
final CacheScheduler.EntryImpl<JdbcExtractionNamespace> key,
|
||||||
|
final JdbcExtractionNamespace namespace
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final DBI dbi = ensureDBI(key, namespace);
|
||||||
|
final String table = namespace.getTable();
|
||||||
|
final String filter = namespace.getFilter();
|
||||||
|
final String valueColumn = namespace.getValueColumn();
|
||||||
|
final String keyColumn = namespace.getKeyColumn();
|
||||||
|
|
||||||
|
return dbi.withHandle(
|
||||||
|
handle -> handle
|
||||||
|
.createQuery(buildLookupQuery(table, filter, keyColumn, valueColumn))
|
||||||
|
.map((index, r, ctx) -> new Pair<>(r.getString(keyColumn), r.getString(valueColumn)))
|
||||||
|
.list()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String buildLookupQuery(String table, String filter, String keyColumn, String valueColumn)
|
||||||
{
|
{
|
||||||
if (Strings.isNullOrEmpty(filter)) {
|
if (Strings.isNullOrEmpty(filter)) {
|
||||||
return StringUtils.format(
|
return StringUtils.format(
|
||||||
|
@ -148,9 +150,8 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DBI ensureDBI(CacheScheduler.EntryImpl<JdbcExtractionNamespace> id, JdbcExtractionNamespace namespace)
|
private DBI ensureDBI(CacheScheduler.EntryImpl<JdbcExtractionNamespace> key, JdbcExtractionNamespace namespace)
|
||||||
{
|
{
|
||||||
final CacheScheduler.EntryImpl<JdbcExtractionNamespace> key = id;
|
|
||||||
DBI dbi = null;
|
DBI dbi = null;
|
||||||
if (dbiCache.containsKey(key)) {
|
if (dbiCache.containsKey(key)) {
|
||||||
dbi = dbiCache.get(key);
|
dbi = dbiCache.get(key);
|
||||||
|
@ -167,33 +168,27 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
|
||||||
return dbi;
|
return dbi;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Long lastUpdates(CacheScheduler.EntryImpl<JdbcExtractionNamespace> id, JdbcExtractionNamespace namespace)
|
@Nullable
|
||||||
|
private Long lastUpdates(CacheScheduler.EntryImpl<JdbcExtractionNamespace> key, JdbcExtractionNamespace namespace)
|
||||||
{
|
{
|
||||||
final DBI dbi = ensureDBI(id, namespace);
|
final DBI dbi = ensureDBI(key, namespace);
|
||||||
final String table = namespace.getTable();
|
final String table = namespace.getTable();
|
||||||
final String tsColumn = namespace.getTsColumn();
|
final String tsColumn = namespace.getTsColumn();
|
||||||
if (tsColumn == null) {
|
if (tsColumn == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final Timestamp update = dbi.withHandle(
|
final Timestamp update = dbi.withHandle(
|
||||||
new HandleCallback<Timestamp>()
|
handle -> {
|
||||||
{
|
final String query = StringUtils.format(
|
||||||
|
"SELECT MAX(%s) FROM %s",
|
||||||
@Override
|
tsColumn, table
|
||||||
public Timestamp withHandle(Handle handle)
|
);
|
||||||
{
|
return handle
|
||||||
final String query = StringUtils.format(
|
.createQuery(query)
|
||||||
"SELECT MAX(%s) FROM %s",
|
.map(TimestampMapper.FIRST)
|
||||||
tsColumn, table
|
.first();
|
||||||
);
|
|
||||||
return handle
|
|
||||||
.createQuery(query)
|
|
||||||
.map(TimestampMapper.FIRST)
|
|
||||||
.first();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
return update.getTime();
|
return update.getTime();
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.lookup.namespace;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
|
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
|
||||||
|
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
|
||||||
|
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
|
||||||
|
import org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
|
||||||
|
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.Period;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class JdbcCacheGeneratorTest
|
||||||
|
{
|
||||||
|
private static final MetadataStorageConnectorConfig MISSING_METADATA_STORAGE_CONNECTOR_CONFIG =
|
||||||
|
createMetadataStorageConnectorConfig("postgresql");
|
||||||
|
|
||||||
|
private static final CacheScheduler.EntryImpl<JdbcExtractionNamespace> KEY =
|
||||||
|
EasyMock.mock(CacheScheduler.EntryImpl.class);
|
||||||
|
|
||||||
|
private static final ServiceEmitter SERVICE_EMITTER = new NoopServiceEmitter();
|
||||||
|
|
||||||
|
private static final CacheScheduler SCHEDULER = new CacheScheduler(
|
||||||
|
SERVICE_EMITTER,
|
||||||
|
Collections.emptyMap(),
|
||||||
|
new OnHeapNamespaceExtractionCacheManager(new Lifecycle(), SERVICE_EMITTER, new NamespaceExtractionConfig())
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final String LAST_VERSION = "1";
|
||||||
|
|
||||||
|
private static final String MISSING_JDB_DRIVER_JAR_MSG =
|
||||||
|
"JDBC driver JAR files missing from extensions/druid-lookups-cached-global directory";
|
||||||
|
|
||||||
|
private JdbcCacheGenerator target;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup()
|
||||||
|
{
|
||||||
|
target = new JdbcCacheGenerator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void indicatesMissingJdbcJarsWithTsColumn()
|
||||||
|
{
|
||||||
|
String tsColumn = "tsColumn";
|
||||||
|
JdbcExtractionNamespace missingJarNamespace = createJdbcExtractionNamespace(
|
||||||
|
MISSING_METADATA_STORAGE_CONNECTOR_CONFIG,
|
||||||
|
tsColumn
|
||||||
|
);
|
||||||
|
|
||||||
|
exception.expect(IllegalStateException.class);
|
||||||
|
exception.expectMessage(MISSING_JDB_DRIVER_JAR_MSG);
|
||||||
|
|
||||||
|
target.generateCache(missingJarNamespace, KEY, LAST_VERSION, SCHEDULER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void indicatesMissingJdbcJarsWithoutTsColumn()
|
||||||
|
{
|
||||||
|
String missingTsColumn = null;
|
||||||
|
@SuppressWarnings("ConstantConditions") // for missingTsColumn
|
||||||
|
JdbcExtractionNamespace missingJarNamespace = createJdbcExtractionNamespace(
|
||||||
|
MISSING_METADATA_STORAGE_CONNECTOR_CONFIG,
|
||||||
|
missingTsColumn
|
||||||
|
);
|
||||||
|
|
||||||
|
exception.expect(IllegalStateException.class);
|
||||||
|
exception.expectMessage(MISSING_JDB_DRIVER_JAR_MSG);
|
||||||
|
|
||||||
|
target.generateCache(missingJarNamespace, KEY, LAST_VERSION, SCHEDULER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("SameParameterValue")
|
||||||
|
private static MetadataStorageConnectorConfig createMetadataStorageConnectorConfig(String type)
|
||||||
|
{
|
||||||
|
String json = "{\"connectURI\":\"jdbc:" + type + "://localhost:5432\"}";
|
||||||
|
try {
|
||||||
|
return new ObjectMapper().readValue(json, MetadataStorageConnectorConfig.class);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("SameParameterValue")
|
||||||
|
private static JdbcExtractionNamespace createJdbcExtractionNamespace(
|
||||||
|
MetadataStorageConnectorConfig metadataStorageConnectorConfig,
|
||||||
|
@Nullable String tsColumn
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return new JdbcExtractionNamespace(
|
||||||
|
metadataStorageConnectorConfig,
|
||||||
|
"table",
|
||||||
|
"keyColumn",
|
||||||
|
"valueColumn",
|
||||||
|
tsColumn,
|
||||||
|
"filter",
|
||||||
|
Period.ZERO
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,22 +23,23 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.druid.common.config.NullHandling;
|
import org.apache.druid.common.config.NullHandling;
|
||||||
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
|
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
|
||||||
import org.apache.druid.server.lookup.DataFetcher;
|
import org.apache.druid.server.lookup.DataFetcher;
|
||||||
import org.skife.jdbi.v2.DBI;
|
import org.skife.jdbi.v2.DBI;
|
||||||
import org.skife.jdbi.v2.Handle;
|
|
||||||
import org.skife.jdbi.v2.TransactionCallback;
|
import org.skife.jdbi.v2.TransactionCallback;
|
||||||
import org.skife.jdbi.v2.TransactionStatus;
|
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
|
||||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
|
||||||
import org.skife.jdbi.v2.util.StringMapper;
|
import org.skife.jdbi.v2.util.StringMapper;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class JdbcDataFetcher implements DataFetcher<String, String>
|
public class JdbcDataFetcher implements DataFetcher<String, String>
|
||||||
{
|
{
|
||||||
|
@ -61,12 +62,12 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
|
||||||
private final String reverseFetchQuery;
|
private final String reverseFetchQuery;
|
||||||
private final DBI dbi;
|
private final DBI dbi;
|
||||||
|
|
||||||
public JdbcDataFetcher(
|
JdbcDataFetcher(
|
||||||
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
|
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
|
||||||
@JsonProperty("table") String table,
|
@JsonProperty("table") String table,
|
||||||
@JsonProperty("keyColumn") String keyColumn,
|
@JsonProperty("keyColumn") String keyColumn,
|
||||||
@JsonProperty("valueColumn") String valueColumn,
|
@JsonProperty("valueColumn") String valueColumn,
|
||||||
@JsonProperty("streamingFetchSize") Integer streamingFetchSize
|
@JsonProperty("streamingFetchSize") @Nullable Integer streamingFetchSize
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
|
this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
|
||||||
|
@ -105,29 +106,20 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
|
||||||
@Override
|
@Override
|
||||||
public Iterable<Map.Entry<String, String>> fetchAll()
|
public Iterable<Map.Entry<String, String>> fetchAll()
|
||||||
{
|
{
|
||||||
return inReadOnlyTransaction((handle, status) -> {
|
return inReadOnlyTransaction((handle, status) -> handle.createQuery(fetchAllQuery)
|
||||||
return handle.createQuery(fetchAllQuery)
|
.setFetchSize(streamingFetchSize)
|
||||||
.setFetchSize(streamingFetchSize)
|
.map(new KeyValueResultSetMapper(keyColumn, valueColumn))
|
||||||
.map(new KeyValueResultSetMapper(keyColumn, valueColumn))
|
.list());
|
||||||
.list();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String fetch(final String key)
|
public String fetch(final String key)
|
||||||
{
|
{
|
||||||
List<String> pairs = inReadOnlyTransaction(
|
List<String> pairs = inReadOnlyTransaction(
|
||||||
new TransactionCallback<List<String>>()
|
(handle, status) -> handle.createQuery(fetchQuery)
|
||||||
{
|
.bind("val", key)
|
||||||
@Override
|
.map(StringMapper.FIRST)
|
||||||
public List<String> inTransaction(Handle handle, TransactionStatus status)
|
.list()
|
||||||
{
|
|
||||||
return handle.createQuery(fetchQuery)
|
|
||||||
.bind("val", key)
|
|
||||||
.map(StringMapper.FIRST)
|
|
||||||
.list();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
if (pairs.isEmpty()) {
|
if (pairs.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -138,25 +130,21 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
|
||||||
@Override
|
@Override
|
||||||
public Iterable<Map.Entry<String, String>> fetch(final Iterable<String> keys)
|
public Iterable<Map.Entry<String, String>> fetch(final Iterable<String> keys)
|
||||||
{
|
{
|
||||||
QueryKeys queryKeys = dbi.onDemand(QueryKeys.class);
|
return runWithMissingJdbcJarHandler(
|
||||||
return queryKeys.findNamesForIds(Lists.newArrayList(keys), table, keyColumn, valueColumn);
|
() -> {
|
||||||
|
QueryKeys queryKeys = dbi.onDemand(QueryKeys.class);
|
||||||
|
return queryKeys.findNamesForIds(Lists.newArrayList(keys), table, keyColumn, valueColumn);
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> reverseFetchKeys(final String value)
|
public List<String> reverseFetchKeys(final String value)
|
||||||
{
|
{
|
||||||
List<String> results = inReadOnlyTransaction(new TransactionCallback<List<String>>()
|
return inReadOnlyTransaction((handle, status) -> handle.createQuery(reverseFetchQuery)
|
||||||
{
|
.bind("val", value)
|
||||||
@Override
|
.map(StringMapper.FIRST)
|
||||||
public List<String> inTransaction(Handle handle, TransactionStatus status)
|
.list());
|
||||||
{
|
|
||||||
return handle.createQuery(reverseFetchQuery)
|
|
||||||
.bind("val", value)
|
|
||||||
.map(StringMapper.FIRST)
|
|
||||||
.list();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return results;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -207,30 +195,44 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
|
||||||
|
|
||||||
private <T> T inReadOnlyTransaction(final TransactionCallback<T> callback)
|
private <T> T inReadOnlyTransaction(final TransactionCallback<T> callback)
|
||||||
{
|
{
|
||||||
return getDbi().withHandle(
|
return runWithMissingJdbcJarHandler(
|
||||||
new HandleCallback<T>()
|
() ->
|
||||||
{
|
getDbi().withHandle(
|
||||||
@Override
|
handle -> {
|
||||||
public T withHandle(Handle handle) throws Exception
|
final Connection connection = handle.getConnection();
|
||||||
{
|
final boolean readOnly = connection.isReadOnly();
|
||||||
final Connection connection = handle.getConnection();
|
connection.setReadOnly(true);
|
||||||
final boolean readOnly = connection.isReadOnly();
|
try {
|
||||||
connection.setReadOnly(true);
|
return handle.inTransaction(callback);
|
||||||
try {
|
}
|
||||||
return handle.inTransaction(callback);
|
finally {
|
||||||
}
|
try {
|
||||||
finally {
|
connection.setReadOnly(readOnly);
|
||||||
try {
|
}
|
||||||
connection.setReadOnly(readOnly);
|
catch (SQLException e) {
|
||||||
}
|
// at least try to log it so we don't swallow exceptions
|
||||||
catch (SQLException e) {
|
LOGGER.error(e, "Unable to reset connection read-only state");
|
||||||
// at least try to log it so we don't swallow exceptions
|
}
|
||||||
LOGGER.error(e, "Unable to reset connection read-only state");
|
}
|
||||||
}
|
}
|
||||||
}
|
)
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T> T runWithMissingJdbcJarHandler(Supplier<T> supplier)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return supplier.get();
|
||||||
|
}
|
||||||
|
catch (UnableToObtainConnectionException e) {
|
||||||
|
if (e.getMessage().contains("No suitable driver found")) {
|
||||||
|
throw new ISE(
|
||||||
|
e,
|
||||||
|
"JDBC driver JAR files missing from extensions/druid-lookups-cached-single directory"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.server.lookup.jdbc;
|
package org.apache.druid.server.lookup.jdbc;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
@ -31,145 +32,233 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.runners.Enclosed;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
import org.skife.jdbi.v2.Handle;
|
import org.skife.jdbi.v2.Handle;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@RunWith(Enclosed.class)
|
||||||
public class JdbcDataFetcherTest
|
public class JdbcDataFetcherTest
|
||||||
{
|
{
|
||||||
@Rule
|
private static final String TABLE_NAME = "tableName";
|
||||||
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
private static final String KEY_COLUMN = "keyColumn";
|
||||||
Handle handle;
|
private static final String VALUE_COLUMN = "valueColumn";
|
||||||
|
|
||||||
private JdbcDataFetcher jdbcDataFetcher;
|
|
||||||
private final String tableName = "tableName";
|
|
||||||
private final String keyColumn = "keyColumn";
|
|
||||||
private final String valueColumn = "valueColumn";
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static final Map<String, String> LOOKUP_MAP = ImmutableMap.of(
|
public static class FetchTest
|
||||||
"foo", "bar",
|
|
||||||
"bad", "bar",
|
|
||||||
"how about that", "foo",
|
|
||||||
"empty string", ""
|
|
||||||
);
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp()
|
|
||||||
{
|
{
|
||||||
jdbcDataFetcher = new JdbcDataFetcher(derbyConnectorRule.getMetadataConnectorConfig(), "tableName", "keyColumn", "valueColumn",
|
@Rule
|
||||||
100);
|
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
||||||
|
|
||||||
handle = derbyConnectorRule.getConnector().getDBI().open();
|
private Handle handle;
|
||||||
Assert.assertEquals(
|
|
||||||
0,
|
private JdbcDataFetcher jdbcDataFetcher;
|
||||||
handle.createStatement(
|
|
||||||
StringUtils.format(
|
private static final Map<String, String> LOOKUP_MAP = ImmutableMap.of(
|
||||||
"CREATE TABLE %s (%s VARCHAR(64), %s VARCHAR(64))",
|
"foo", "bar",
|
||||||
tableName,
|
"bad", "bar",
|
||||||
keyColumn,
|
"how about that", "foo",
|
||||||
valueColumn
|
"empty string", ""
|
||||||
)
|
|
||||||
).setQueryTimeout(1).execute()
|
|
||||||
);
|
);
|
||||||
handle.createStatement(StringUtils.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute();
|
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : LOOKUP_MAP.entrySet()) {
|
@Before
|
||||||
insertValues(entry.getKey(), entry.getValue(), handle);
|
public void setUp()
|
||||||
|
{
|
||||||
|
jdbcDataFetcher = new JdbcDataFetcher(
|
||||||
|
derbyConnectorRule.getMetadataConnectorConfig(),
|
||||||
|
"tableName",
|
||||||
|
"keyColumn",
|
||||||
|
"valueColumn",
|
||||||
|
100
|
||||||
|
);
|
||||||
|
|
||||||
|
handle = derbyConnectorRule.getConnector().getDBI().open();
|
||||||
|
Assert.assertEquals(
|
||||||
|
0,
|
||||||
|
handle.createStatement(
|
||||||
|
StringUtils.format(
|
||||||
|
"CREATE TABLE %s (%s VARCHAR(64), %s VARCHAR(64))",
|
||||||
|
TABLE_NAME,
|
||||||
|
KEY_COLUMN,
|
||||||
|
VALUE_COLUMN
|
||||||
|
)
|
||||||
|
).setQueryTimeout(1).execute()
|
||||||
|
);
|
||||||
|
handle.createStatement(StringUtils.format("TRUNCATE TABLE %s", TABLE_NAME)).setQueryTimeout(1).execute();
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> entry : LOOKUP_MAP.entrySet()) {
|
||||||
|
insertValues(entry.getKey(), entry.getValue(), handle);
|
||||||
|
}
|
||||||
|
handle.commit();
|
||||||
}
|
}
|
||||||
handle.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown()
|
public void tearDown()
|
||||||
{
|
{
|
||||||
handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute();
|
handle.createStatement("DROP TABLE " + TABLE_NAME).setQueryTimeout(1).execute();
|
||||||
handle.close();
|
handle.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetch()
|
public void testFetch()
|
||||||
{
|
{
|
||||||
Assert.assertEquals("null check", null, jdbcDataFetcher.fetch("baz"));
|
Assert.assertEquals("null check", null, jdbcDataFetcher.fetch("baz"));
|
||||||
assertMapLookup(LOOKUP_MAP, jdbcDataFetcher);
|
assertMapLookup(LOOKUP_MAP, jdbcDataFetcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchAll()
|
public void testFetchAll()
|
||||||
{
|
{
|
||||||
ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
|
ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
|
||||||
jdbcDataFetcher.fetchAll().forEach(mapBuilder::put);
|
jdbcDataFetcher.fetchAll().forEach(mapBuilder::put);
|
||||||
Assert.assertEquals("maps should match", LOOKUP_MAP, mapBuilder.build());
|
Assert.assertEquals("maps should match", LOOKUP_MAP, mapBuilder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchKeys()
|
public void testFetchKeys()
|
||||||
{
|
{
|
||||||
ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
|
ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
|
||||||
jdbcDataFetcher.fetch(LOOKUP_MAP.keySet()).forEach(mapBuilder::put);
|
jdbcDataFetcher.fetch(LOOKUP_MAP.keySet()).forEach(mapBuilder::put);
|
||||||
Assert.assertEquals(LOOKUP_MAP, mapBuilder.build());
|
Assert.assertEquals(LOOKUP_MAP, mapBuilder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReverseFetch()
|
public void testReverseFetch()
|
||||||
{
|
{
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"reverse lookup should match",
|
"reverse lookup should match",
|
||||||
Sets.newHashSet("foo", "bad"),
|
Sets.newHashSet("foo", "bad"),
|
||||||
Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("bar"))
|
Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("bar"))
|
||||||
);
|
);
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"reverse lookup should match",
|
"reverse lookup should match",
|
||||||
Sets.newHashSet("how about that"),
|
Sets.newHashSet("how about that"),
|
||||||
Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("foo"))
|
Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("foo"))
|
||||||
);
|
);
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"reverse lookup should match",
|
"reverse lookup should match",
|
||||||
Sets.newHashSet("empty string"),
|
Sets.newHashSet("empty string"),
|
||||||
Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys(""))
|
Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys(""))
|
||||||
);
|
);
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"reverse lookup of none existing value should be empty list",
|
"reverse lookup of none existing value should be empty list",
|
||||||
Collections.EMPTY_LIST,
|
Collections.EMPTY_LIST,
|
||||||
jdbcDataFetcher.reverseFetchKeys("does't exist")
|
jdbcDataFetcher.reverseFetchKeys("does't exist")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSerDesr() throws IOException
|
public void testSerDesr() throws IOException
|
||||||
{
|
{
|
||||||
JdbcDataFetcher jdbcDataFetcher = new JdbcDataFetcher(new MetadataStorageConnectorConfig(), "table", "keyColumn", "ValueColumn",
|
JdbcDataFetcher jdbcDataFetcher = new JdbcDataFetcher(
|
||||||
100);
|
new MetadataStorageConnectorConfig(),
|
||||||
DefaultObjectMapper mapper = new DefaultObjectMapper();
|
"table",
|
||||||
String jdbcDataFetcherSer = mapper.writeValueAsString(jdbcDataFetcher);
|
"keyColumn",
|
||||||
Assert.assertEquals(jdbcDataFetcher, mapper.readerFor(DataFetcher.class).readValue(jdbcDataFetcherSer));
|
"ValueColumn",
|
||||||
}
|
100
|
||||||
|
);
|
||||||
|
DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
String jdbcDataFetcherSer = mapper.writeValueAsString(jdbcDataFetcher);
|
||||||
|
Assert.assertEquals(jdbcDataFetcher, mapper.readerFor(DataFetcher.class).readValue(jdbcDataFetcherSer));
|
||||||
|
}
|
||||||
|
|
||||||
private void assertMapLookup(Map<String, String> map, DataFetcher dataFetcher)
|
@SuppressWarnings("SameParameterValue")
|
||||||
{
|
private void assertMapLookup(Map<String, String> map, DataFetcher dataFetcher)
|
||||||
for (Map.Entry<String, String> entry : map.entrySet()) {
|
{
|
||||||
String key = entry.getKey();
|
for (Map.Entry<String, String> entry : map.entrySet()) {
|
||||||
String val = entry.getValue();
|
String key = entry.getKey();
|
||||||
Assert.assertEquals("non-null check", val, dataFetcher.fetch(key));
|
String val = entry.getValue();
|
||||||
|
Assert.assertEquals("non-null check", val, dataFetcher.fetch(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insertValues(final String key, final String val, Handle handle)
|
||||||
|
{
|
||||||
|
final String query;
|
||||||
|
handle.createStatement(
|
||||||
|
StringUtils.format("DELETE FROM %s WHERE %s='%s'", TABLE_NAME, KEY_COLUMN, key)
|
||||||
|
).setQueryTimeout(1).execute();
|
||||||
|
query = StringUtils.format(
|
||||||
|
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
|
||||||
|
TABLE_NAME,
|
||||||
|
KEY_COLUMN, VALUE_COLUMN,
|
||||||
|
key, val
|
||||||
|
);
|
||||||
|
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
|
||||||
|
handle.commit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insertValues(final String key, final String val, Handle handle)
|
public static class MissingJdbcJarTest
|
||||||
{
|
{
|
||||||
final String query;
|
private static final MetadataStorageConnectorConfig MISSING_METADATA_STORAGE_CONNECTOR_CONFIG =
|
||||||
handle.createStatement(
|
createMissingMetadataStorageConnectorConfig();
|
||||||
StringUtils.format("DELETE FROM %s WHERE %s='%s'", tableName, keyColumn, key)
|
private static final String KEY = "key";
|
||||||
).setQueryTimeout(1).execute();
|
|
||||||
query = StringUtils.format(
|
private JdbcDataFetcher target;
|
||||||
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
|
|
||||||
tableName,
|
@Rule
|
||||||
keyColumn, valueColumn,
|
public ExpectedException exception = ExpectedException.none();
|
||||||
key, val
|
|
||||||
);
|
@Before
|
||||||
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
|
public void setUp()
|
||||||
handle.commit();
|
{
|
||||||
|
target = new JdbcDataFetcher(
|
||||||
|
MISSING_METADATA_STORAGE_CONNECTOR_CONFIG,
|
||||||
|
TABLE_NAME,
|
||||||
|
KEY_COLUMN,
|
||||||
|
VALUE_COLUMN,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchAll()
|
||||||
|
{
|
||||||
|
test(() -> target.fetchAll());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetch()
|
||||||
|
{
|
||||||
|
test(() -> target.fetch(Collections.singleton(KEY)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchKeys()
|
||||||
|
{
|
||||||
|
test(() -> target.fetch(Collections.singleton(KEY)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReverseFetch()
|
||||||
|
{
|
||||||
|
test(() -> target.reverseFetchKeys(KEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void test(Runnable runnable)
|
||||||
|
{
|
||||||
|
exception.expect(IllegalStateException.class);
|
||||||
|
exception.expectMessage("JDBC driver JAR files missing from extensions/druid-lookups-cached-single directory");
|
||||||
|
|
||||||
|
runnable.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("SameParameterValue")
|
||||||
|
private static MetadataStorageConnectorConfig createMissingMetadataStorageConnectorConfig()
|
||||||
|
{
|
||||||
|
String type = "postgresql";
|
||||||
|
String json = "{\"connectURI\":\"jdbc:" + type + "://localhost:5432\"}";
|
||||||
|
try {
|
||||||
|
return new ObjectMapper().readValue(json, MetadataStorageConnectorConfig.class);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue