Warn if cache size of lookup is beyond max size (#11863)

Enhanced the ExtractionNamespace interface in lookups-cached-global core extension with the ability to set a maxHeapPercentage for the cache of the respective namespace. The reason for adding this functionality, is make it easier to detect when a lookup table grows to a size that the underlying service cannot handle, because it does not have enough memory. The default value of maxHeap for the interface is -1, which indicates that no maxHeapPercentage has been set. For the JdbcExtractionNamespace and UriExtractionNamespace implementations, the default value is null, which will cause the respective service that the lookup is loaded in, to warn when its cache is beyond mxHeapPercentage of the service's configured max heap size. If a positive non-null value is set for the namespace's maxHeapPercentage config, this value will be honored for all services that the respective lookup is loaded onto, and consequently log warning messages when the cache of the respective lookup grows beyond this respective percentage of the services configured max heap size. Warnings are logged every time that either Uri based or Jdbc based lookups are regenerated, if the maxHeapPercentage constraint is violated. No other implementations will log warnings at this time. No error is thrown when the size exceeds the maxHeapPercentage at this time, as doing so could break functionality for existing users. Previously the JdbcCacheGenerator generated its cache by materializing all rows of the underling table in memory at once; this made it difficult to log warning messages in the case that the results from the jdbc query were very large and caused the service to run out of memory. To help with this, this pr makes it so that the jdbc query results are instead streamed through an iterator.
This commit is contained in:
zachjsh 2021-11-03 21:32:22 -04:00 committed by GitHub
parent 652e1491e0
commit 1d6df48145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 569 additions and 34 deletions

View File

@ -204,7 +204,8 @@ The remapping values for each globally cached lookup can be specified by a JSON
"\"value\"]" "\"value\"]"
] ]
}, },
"pollPeriod":"PT5M" "pollPeriod":"PT5M",
"maxHeapPercentage": 10
} }
``` ```
@ -215,6 +216,7 @@ The remapping values for each globally cached lookup can be specified by a JSON
|`uriPrefix`|A URI that specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`| |`uriPrefix`|A URI that specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`|
|`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`| |`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`|
|`namespaceParseSpec`|How to interpret the data at the URI|Yes|| |`namespaceParseSpec`|How to interpret the data at the URI|Yes||
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|
One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GCS (gs://) location. HTTP location is not currently supported. One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GCS (gs://) location. HTTP location is not currently supported.
@ -351,6 +353,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter| |`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter|
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used| |`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)| |`pollPeriod`|How often to poll the DB|No|0 (only once)|
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|
```json ```json
{ {
@ -364,7 +367,8 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
"keyColumn":"the_old_dim_value", "keyColumn":"the_old_dim_value",
"valueColumn":"the_new_dim_value", "valueColumn":"the_new_dim_value",
"tsColumn":"timestamp_column", "tsColumn":"timestamp_column",
"pollPeriod":600000 "pollPeriod":600000,
"maxHeapPercentage": 10
} }
``` ```

View File

@ -19,13 +19,18 @@
package org.apache.druid.data.input; package org.apache.druid.data.input;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.io.LineProcessor; import com.google.common.io.LineProcessor;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.java.util.common.parsers.Parser;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
/** /**
@ -36,6 +41,12 @@ import java.util.Map;
*/ */
public class MapPopulator<K, V> public class MapPopulator<K, V>
{ {
private static final Logger LOG = new Logger(MapPopulator.class);
private static final String DOUBLE_CLASS_NAME = Double.class.getName();
private static final String FLOAT_CLASS_NAME = Float.class.getName();
private static final String INTEGER_CLASS_NAME = Integer.class.getName();
private static final String LONG_CLASS_NAME = Long.class.getName();
private static final String STRING_CLASS_NAME = String.class.getName();
private final Parser<K, V> parser; private final Parser<K, V> parser;
public MapPopulator( public MapPopulator(
@ -49,11 +60,13 @@ public class MapPopulator<K, V>
{ {
private final int lines; private final int lines;
private final int entries; private final int entries;
private final long bytes;
public PopulateResult(int lines, int entries) public PopulateResult(int lines, int entries, long bytes)
{ {
this.lines = lines; this.lines = lines;
this.entries = entries; this.entries = entries;
this.bytes = bytes;
} }
public int getLines() public int getLines()
@ -65,6 +78,11 @@ public class MapPopulator<K, V>
{ {
return entries; return entries;
} }
public long getBytes()
{
return bytes;
}
} }
/** /**
@ -78,12 +96,41 @@ public class MapPopulator<K, V>
* @throws IOException * @throws IOException
*/ */
public PopulateResult populate(final ByteSource source, final Map<K, V> map) throws IOException public PopulateResult populate(final ByteSource source, final Map<K, V> map) throws IOException
{
return populateAndWarnAtByteLimit(source, map, -1L, null);
}
/**
* Read through the `source` line by line and populate `map` with the data returned from the `parser`. Warning
* messages will be logged if the `byteLimit` > 0, and the number of bytes read into the map exceed the byte limit.
* Note: in order to compute the byte length properly, the key and value types of map must both be instances of
* String, otherwise no byte length is computed.
*
* @param source The ByteSource to read lines from
* @param map The map to populate
* @param byteLimit The limit of number of bytes after which a warning should be shown in the log. if < 0, indicates
* no limit.
* @param name The name of the map that is being populated. Used to identify the map in log messages written.
*
* @return number of lines read and entries parsed
*
* @throws IOException
*/
public PopulateResult populateAndWarnAtByteLimit(
final ByteSource source,
final Map<K, V> map,
final long byteLimit,
final String name
) throws IOException
{ {
return source.asCharSource(StandardCharsets.UTF_8).readLines( return source.asCharSource(StandardCharsets.UTF_8).readLines(
new LineProcessor<PopulateResult>() new LineProcessor<PopulateResult>()
{ {
private int lines = 0; private int lines = 0;
private int entries = 0; private int entries = 0;
private long bytes = 0L;
private long byteLimitMultiple = 1L;
private boolean keyAndValueByteSizesCanBeDetermined = true;
@Override @Override
public boolean processLine(String line) public boolean processLine(String line)
@ -92,18 +139,148 @@ public class MapPopulator<K, V>
throw new ISE("Cannot read more than %,d lines", Integer.MAX_VALUE); throw new ISE("Cannot read more than %,d lines", Integer.MAX_VALUE);
} }
final Map<K, V> kvMap = parser.parseToMap(line); final Map<K, V> kvMap = parser.parseToMap(line);
if (kvMap == null) {
return true;
}
map.putAll(kvMap); map.putAll(kvMap);
lines++; lines++;
entries += kvMap.size(); entries += kvMap.size();
// this top level check so that we dont keep logging inability to determine
// byte length for all (key, value) pairs.
if (0 < byteLimit && keyAndValueByteSizesCanBeDetermined) {
for (Map.Entry<K, V> e : kvMap.entrySet()) {
keyAndValueByteSizesCanBeDetermined = canKeyAndValueTypesByteSizesBeDetermined(
e.getKey(),
e.getValue()
);
if (keyAndValueByteSizesCanBeDetermined) {
bytes += getByteLengthOfObject(e.getKey());
bytes += getByteLengthOfObject(e.getValue());
if (bytes > byteLimit * byteLimitMultiple) {
LOG.warn(
"[%s] exceeded the byteLimit of [%,d]. Current bytes [%,d]",
name,
byteLimit,
bytes
);
byteLimitMultiple++;
}
}
}
}
return true; return true;
} }
@Override @Override
public PopulateResult getResult() public PopulateResult getResult()
{ {
return new PopulateResult(lines, entries); return new PopulateResult(lines, entries, bytes);
} }
} }
); );
} }
/**
* Read through the `iterator` and populate `map` with the data iterated over. Warning
* messages will be logged if the `byteLimit` > 0, and the number of bytes read into the map exceed the byte limit.
* Note: in order to compute the byte length properly, the key and value types of map must both be instances of
* String, otherwise no byte length is computed.
*
* @param iterator The iterator to iterate over
* @param map The map to populate
* @param byteLimit The limit of number of bytes after which a warning should be shown in the log. if < 0, indicates
* no limit.
* @param name The name of the map that is being populated. Used to identify the map in log messages written.
*
* @return number of entries parsed and bytes stored in the map.
*/
public static <K, V> PopulateResult populateAndWarnAtByteLimit(
final Iterator<Pair<K, V>> iterator,
final Map<K, V> map,
final long byteLimit,
final String name
)
{
int lines = 0;
int entries = 0;
long bytes = 0L;
long byteLimitMultiple = 1L;
boolean keyAndValueByteSizesCanBeDetermined = true;
while (iterator.hasNext()) {
Pair<K, V> pair = iterator.next();
K lhs = null != pair ? pair.lhs : null;
V rhs = null != pair ? pair.rhs : null;
map.put(lhs, rhs);
entries++;
// this top level check so that we dont keep logging inability to determine
// byte length for all pairs.
if (0 < byteLimit && keyAndValueByteSizesCanBeDetermined) {
keyAndValueByteSizesCanBeDetermined = canKeyAndValueTypesByteSizesBeDetermined(lhs, rhs);
if (keyAndValueByteSizesCanBeDetermined) {
bytes += getByteLengthOfObject(lhs);
bytes += getByteLengthOfObject(rhs);
if (bytes > byteLimit * byteLimitMultiple) {
LOG.warn(
"[%s] exceeded the byteLimit of [%,d]. Current bytes [%,d]",
name,
byteLimit,
bytes
);
byteLimitMultiple++;
}
}
}
}
return new PopulateResult(lines, entries, bytes);
}
/**
* only works for objects of type String, Double, Float, Integer, or Long.
* @param o the object to get the number of bytes of.
* @return the number of bytes of the object.
*/
@VisibleForTesting
static long getByteLengthOfObject(@Nullable Object o)
{
if (null != o) {
if (o.getClass().getName().equals(STRING_CLASS_NAME)) {
return ((String) (o)).length();
} else if (o.getClass().getName().equals(DOUBLE_CLASS_NAME)) {
return 8;
} else if (o.getClass().getName().equals(FLOAT_CLASS_NAME)) {
return 4;
} else if (o.getClass().getName().equals(INTEGER_CLASS_NAME)) {
return 4;
} else if (o.getClass().getName().equals(LONG_CLASS_NAME)) {
return 8;
}
}
return 0;
}
@VisibleForTesting
static <K, V> boolean canKeyAndValueTypesByteSizesBeDetermined(@Nullable K key, @Nullable V value)
{
boolean canBeDetermined = (null == key
|| key.getClass().getName().equals(STRING_CLASS_NAME)
|| key.getClass().getName().equals(DOUBLE_CLASS_NAME)
|| key.getClass().getName().equals(FLOAT_CLASS_NAME)
|| key.getClass().getName().equals(INTEGER_CLASS_NAME)
|| key.getClass().getName().equals(LONG_CLASS_NAME))
&& (null == value
|| value.getClass().getName().equals(STRING_CLASS_NAME)
|| value.getClass().getName().equals(DOUBLE_CLASS_NAME)
|| value.getClass().getName().equals(FLOAT_CLASS_NAME)
|| value.getClass().getName().equals(INTEGER_CLASS_NAME)
|| value.getClass().getName().equals(LONG_CLASS_NAME));
if (!canBeDetermined) {
LOG.warn(
"cannot compute number of bytes when populating map because key and value classes are neither "
+ "Double, Float, Integer, Long, or String. Key class: [%s], Value class: [%s]",
null != key ? key.getClass().getName() : null,
null != value ? value.getClass().getName() : null);
}
return canBeDetermined;
}
} }

View File

@ -36,4 +36,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
public interface ExtractionNamespace public interface ExtractionNamespace
{ {
long getPollMs(); long getPollMs();
default long getMaxHeapPercentage()
{
return -1L;
}
} }

View File

@ -43,6 +43,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
{ {
private static final Logger LOG = new Logger(JdbcExtractionNamespace.class); private static final Logger LOG = new Logger(JdbcExtractionNamespace.class);
long DEFAULT_MAX_HEAP_PERCENTAGE = 10L;
@JsonProperty @JsonProperty
private final MetadataStorageConnectorConfig connectorConfig; private final MetadataStorageConnectorConfig connectorConfig;
@JsonProperty @JsonProperty
@ -57,6 +59,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
private final String filter; private final String filter;
@JsonProperty @JsonProperty
private final Period pollPeriod; private final Period pollPeriod;
@JsonProperty
private final long maxHeapPercentage;
@JsonCreator @JsonCreator
public JdbcExtractionNamespace( public JdbcExtractionNamespace(
@ -68,6 +72,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
@JsonProperty(value = "tsColumn") @Nullable final String tsColumn, @JsonProperty(value = "tsColumn") @Nullable final String tsColumn,
@JsonProperty(value = "filter") @Nullable final String filter, @JsonProperty(value = "filter") @Nullable final String filter,
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod, @Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod,
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage,
@JacksonInject JdbcAccessSecurityConfig securityConfig @JacksonInject JdbcAccessSecurityConfig securityConfig
) )
{ {
@ -90,6 +95,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
} else { } else {
this.pollPeriod = pollPeriod; this.pollPeriod = pollPeriod;
} }
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
} }
/** /**
@ -150,6 +156,12 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
return pollPeriod.toStandardDuration().getMillis(); return pollPeriod.toStandardDuration().getMillis();
} }
@Override
public long getMaxHeapPercentage()
{
return maxHeapPercentage;
}
@Override @Override
public String toString() public String toString()
{ {
@ -161,6 +173,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
", tsColumn='" + tsColumn + '\'' + ", tsColumn='" + tsColumn + '\'' +
", filter='" + filter + '\'' + ", filter='" + filter + '\'' +
", pollPeriod=" + pollPeriod + ", pollPeriod=" + pollPeriod +
", maxHeapPercentage=" + maxHeapPercentage +
'}'; '}';
} }
@ -182,7 +195,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
Objects.equals(keyColumn, that.keyColumn) && Objects.equals(keyColumn, that.keyColumn) &&
Objects.equals(valueColumn, that.valueColumn) && Objects.equals(valueColumn, that.valueColumn) &&
Objects.equals(tsColumn, that.tsColumn) && Objects.equals(tsColumn, that.tsColumn) &&
Objects.equals(pollPeriod, that.pollPeriod); Objects.equals(pollPeriod, that.pollPeriod) &&
Objects.equals(maxHeapPercentage, that.maxHeapPercentage);
} }
@Override @Override
@ -195,7 +209,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
keyColumn, keyColumn,
valueColumn, valueColumn,
tsColumn, tsColumn,
pollPeriod pollPeriod,
maxHeapPercentage
); );
} }
} }

View File

@ -67,6 +67,8 @@ public class UriExtractionNamespace implements ExtractionNamespace
{ {
private static final Logger LOG = new Logger(UriExtractionNamespace.class); private static final Logger LOG = new Logger(UriExtractionNamespace.class);
long DEFAULT_MAX_HEAP_PERCENTAGE = 10;
@JsonProperty @JsonProperty
private final URI uri; private final URI uri;
@JsonProperty @JsonProperty
@ -77,6 +79,8 @@ public class UriExtractionNamespace implements ExtractionNamespace
private final String fileRegex; private final String fileRegex;
@JsonProperty @JsonProperty
private final Period pollPeriod; private final Period pollPeriod;
@JsonProperty
private final Long maxHeapPercentage;
@JsonCreator @JsonCreator
public UriExtractionNamespace( public UriExtractionNamespace(
@ -92,7 +96,9 @@ public class UriExtractionNamespace implements ExtractionNamespace
Period pollPeriod, Period pollPeriod,
@Deprecated @Deprecated
@JsonProperty(value = "versionRegex", required = false) @JsonProperty(value = "versionRegex", required = false)
String versionRegex String versionRegex,
@JsonProperty(value = "maxHeapPercentage") @Nullable
Long maxHeapPercentage
) )
{ {
this.uri = uri; this.uri = uri;
@ -127,6 +133,7 @@ public class UriExtractionNamespace implements ExtractionNamespace
throw new IAE(ex, "Could not parse `fileRegex` [%s]", this.fileRegex); throw new IAE(ex, "Could not parse `fileRegex` [%s]", this.fileRegex);
} }
} }
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
} }
public String getFileRegex() public String getFileRegex()
@ -155,6 +162,12 @@ public class UriExtractionNamespace implements ExtractionNamespace
return pollPeriod.toStandardDuration().getMillis(); return pollPeriod.toStandardDuration().getMillis();
} }
@Override
public long getMaxHeapPercentage()
{
return maxHeapPercentage;
}
@Override @Override
public String toString() public String toString()
{ {
@ -164,6 +177,7 @@ public class UriExtractionNamespace implements ExtractionNamespace
", namespaceParseSpec=" + namespaceParseSpec + ", namespaceParseSpec=" + namespaceParseSpec +
", fileRegex='" + fileRegex + '\'' + ", fileRegex='" + fileRegex + '\'' +
", pollPeriod=" + pollPeriod + ", pollPeriod=" + pollPeriod +
", maxHeapPercentage=" + maxHeapPercentage +
'}'; '}';
} }
@ -191,7 +205,8 @@ public class UriExtractionNamespace implements ExtractionNamespace
if (getFileRegex() != null ? !getFileRegex().equals(that.getFileRegex()) : that.getFileRegex() != null) { if (getFileRegex() != null ? !getFileRegex().equals(that.getFileRegex()) : that.getFileRegex() != null) {
return false; return false;
} }
return pollPeriod.equals(that.pollPeriod); return pollPeriod.equals(that.pollPeriod) &&
Objects.equals(maxHeapPercentage, that.maxHeapPercentage);
} }
@ -203,6 +218,7 @@ public class UriExtractionNamespace implements ExtractionNamespace
result = 31 * result + getNamespaceParseSpec().hashCode(); result = 31 * result + getNamespaceParseSpec().hashCode();
result = 31 * result + (getFileRegex() != null ? getFileRegex().hashCode() : 0); result = 31 * result + (getFileRegex() != null ? getFileRegex().hashCode() : 0);
result = 31 * result + pollPeriod.hashCode(); result = 31 * result + pollPeriod.hashCode();
result = 31 * result * Objects.hashCode(maxHeapPercentage);
return result; return result;
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.lookup.namespace; package org.apache.druid.server.lookup.namespace;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import org.apache.druid.data.input.MapPopulator;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
@ -28,13 +29,15 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.namespace.CacheGenerator; import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace; import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler; import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.utils.JvmUtils;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.util.TimestampMapper; import org.skife.jdbi.v2.util.TimestampMapper;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -45,6 +48,10 @@ import java.util.concurrent.ConcurrentMap;
public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNamespace> public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNamespace>
{ {
private static final Logger LOG = new Logger(JdbcCacheGenerator.class); private static final Logger LOG = new Logger(JdbcCacheGenerator.class);
private static final String NO_SUITABLE_DRIVER_FOUND_ERROR = "No suitable driver found";
private static final String JDBC_DRIVER_JAR_FILES_MISSING_ERROR =
"JDBC driver JAR files missing from extensions/druid-lookups-cached-global directory";
private static final long MAX_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes();
private final ConcurrentMap<CacheScheduler.EntryImpl<JdbcExtractionNamespace>, DBI> dbiCache = private final ConcurrentMap<CacheScheduler.EntryImpl<JdbcExtractionNamespace>, DBI> dbiCache =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
@ -59,7 +66,6 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
{ {
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion); final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
final Long lastDBUpdate; final Long lastDBUpdate;
final List<Pair<String, String>> pairs;
final long dbQueryStart; final long dbQueryStart;
try { try {
@ -67,21 +73,20 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) { if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
return null; return null;
} }
dbQueryStart = System.currentTimeMillis();
LOG.debug("Updating %s", entryId);
pairs = getLookupPairs(entryId, namespace);
} }
catch (UnableToObtainConnectionException e) { catch (UnableToObtainConnectionException e) {
if (e.getMessage().contains("No suitable driver found")) { if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
throw new ISE( throw new ISE(
e, e,
"JDBC driver JAR files missing from extensions/druid-lookups-cached-global directory" JDBC_DRIVER_JAR_FILES_MISSING_ERROR
); );
} else { } else {
throw e; throw e;
} }
} }
dbQueryStart = System.currentTimeMillis();
LOG.debug("Updating %s", entryId);
final String newVersion; final String newVersion;
if (lastDBUpdate != null) { if (lastDBUpdate != null) {
@ -90,14 +95,38 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
newVersion = StringUtils.format("%d", dbQueryStart); newVersion = StringUtils.format("%d", dbQueryStart);
} }
final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, newVersion); final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, newVersion);
try {
final long startNs = System.nanoTime();
try (
Handle handle = getHandle(entryId, namespace);
ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle, namespace)) {
final Map<String, String> cache = versionedCache.getCache(); final Map<String, String> cache = versionedCache.getCache();
for (Pair<String, String> pair : pairs) { final MapPopulator.PopulateResult populateResult = MapPopulator.populateAndWarnAtByteLimit(
cache.put(pair.lhs, pair.rhs); pairs,
} cache,
LOG.info("Finished loading %d values for %s", cache.size(), entryId); (long) (MAX_MEMORY * namespace.getMaxHeapPercentage() / 100.0),
null == entryId ? null : entryId.toString()
);
final long duration = System.nanoTime() - startNs;
LOG.info(
"Finished loading %,d values (%d bytes) for [%s] in %,d ns",
populateResult.getEntries(),
populateResult.getBytes(),
entryId,
duration
);
return versionedCache; return versionedCache;
} }
catch (UnableToObtainConnectionException e) {
if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
throw new ISE(
e,
JDBC_DRIVER_JAR_FILES_MISSING_ERROR
);
} else {
throw e;
}
}
catch (Throwable t) { catch (Throwable t) {
try { try {
versionedCache.close(); versionedCache.close();
@ -109,23 +138,28 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
} }
} }
private List<Pair<String, String>> getLookupPairs( private Handle getHandle(
final CacheScheduler.EntryImpl<JdbcExtractionNamespace> key, final CacheScheduler.EntryImpl<JdbcExtractionNamespace> key,
final JdbcExtractionNamespace namespace final JdbcExtractionNamespace namespace
) )
{ {
final DBI dbi = ensureDBI(key, namespace); final DBI dbi = ensureDBI(key, namespace);
return dbi.open();
}
private ResultIterator<Pair<String, String>> getLookupPairs(
final Handle handle,
final JdbcExtractionNamespace namespace
)
{
final String table = namespace.getTable(); final String table = namespace.getTable();
final String filter = namespace.getFilter(); final String filter = namespace.getFilter();
final String valueColumn = namespace.getValueColumn(); final String valueColumn = namespace.getValueColumn();
final String keyColumn = namespace.getKeyColumn(); final String keyColumn = namespace.getKeyColumn();
return dbi.withHandle( return handle.createQuery(buildLookupQuery(table, filter, keyColumn, valueColumn))
handle -> handle .map((index1, r1, ctx1) -> new Pair<>(r1.getString(keyColumn), r1.getString(valueColumn)))
.createQuery(buildLookupQuery(table, filter, keyColumn, valueColumn)) .iterator();
.map((index, r, ctx) -> new Pair<>(r.getString(keyColumn), r.getString(valueColumn)))
.list()
);
} }
private static String buildLookupQuery(String table, String filter, String keyColumn, String valueColumn) private static String buildLookupQuery(String table, String filter, String keyColumn, String valueColumn)

View File

@ -32,6 +32,7 @@ import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.segment.loading.URIDataPuller; import org.apache.druid.segment.loading.URIDataPuller;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler; import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -48,6 +49,7 @@ public final class UriCacheGenerator implements CacheGenerator<UriExtractionName
{ {
private static final int DEFAULT_NUM_RETRIES = 3; private static final int DEFAULT_NUM_RETRIES = 3;
private static final Logger log = new Logger(UriCacheGenerator.class); private static final Logger log = new Logger(UriCacheGenerator.class);
private static final long MAX_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes();
private final Map<String, SearchableVersionedDataFinder> pullers; private final Map<String, SearchableVersionedDataFinder> pullers;
@Inject @Inject
@ -146,11 +148,17 @@ public final class UriCacheGenerator implements CacheGenerator<UriExtractionName
final long startNs = System.nanoTime(); final long startNs = System.nanoTime();
final MapPopulator.PopulateResult populateResult = new MapPopulator<>( final MapPopulator.PopulateResult populateResult = new MapPopulator<>(
extractionNamespace.getNamespaceParseSpec().getParser() extractionNamespace.getNamespaceParseSpec().getParser()
).populate(source, versionedCache.getCache()); ).populateAndWarnAtByteLimit(
source,
versionedCache.getCache(),
(long) (MAX_MEMORY * extractionNamespace.getMaxHeapPercentage() / 100.0),
null == entryId ? null : entryId.toString()
);
final long duration = System.nanoTime() - startNs; final long duration = System.nanoTime() - startNs;
log.info( log.info(
"Finished loading %,d values from %,d lines for [%s] in %,d ns", "Finished loading %,d values (%d bytes) from %,d lines for [%s] in %,d ns",
populateResult.getEntries(), populateResult.getEntries(),
populateResult.getBytes(),
populateResult.getLines(), populateResult.getLines(),
entryId, entryId,
duration duration

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class MapPopulatorTest
{
private static long EIGHT_BYTES = 8L;
private static long FOUR_BYTES = 4L;
@Test
public void test_getByteLengthOfObject_string_stringLength()
{
String o = "string";
Assert.assertEquals(o.length(), MapPopulator.getByteLengthOfObject(o));
}
@Test
public void test_getByteLengthOfObject_double_8()
{
Assert.assertEquals(EIGHT_BYTES, MapPopulator.getByteLengthOfObject(12.0));
}
@Test
public void test_getByteLengthOfObject_float_4()
{
Assert.assertEquals(FOUR_BYTES, MapPopulator.getByteLengthOfObject(12.0F));
}
@Test
public void test_getByteLengthOfObject_int_4()
{
Assert.assertEquals(FOUR_BYTES, MapPopulator.getByteLengthOfObject(12));
}
@Test
public void test_getByteLengthOfObject_long_8()
{
Assert.assertEquals(EIGHT_BYTES, MapPopulator.getByteLengthOfObject(12L));
}
@Test
public void test_getByteLengthOfObject_null_0()
{
Assert.assertEquals(0, MapPopulator.getByteLengthOfObject(null));
}
@Test
public void test_getByteLengthOfObject_map_0()
{
Assert.assertEquals(0, MapPopulator.getByteLengthOfObject(ImmutableMap.of()));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_stringKeyAndStringValue_true()
{
Assert.assertTrue(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined("key", "value"));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_doubleKeyAndDoubleValue_true()
{
Assert.assertTrue(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(1.0, 2.0));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_floatKeyAndFloatValue_true()
{
Assert.assertTrue(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(1.0F, 2.0F));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_intKeyAndIntValue_true()
{
Assert.assertTrue(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(1, 2));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_longKeyAndLongValue_true()
{
Assert.assertTrue(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(1L, 2L));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_nullKeyAndNullValue_true()
{
Assert.assertTrue(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(null, null));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_mapKeyAndmapValue_false()
{
Assert.assertFalse(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(ImmutableMap.of(), ImmutableMap.of()));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_nullKeyAndmapValue_false()
{
Assert.assertFalse(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(null, ImmutableMap.of()));
}
@Test
public void test_canKeyAndValueTypesByteSizesBeDetermined_mapKeyAndNullValue_false()
{
Assert.assertFalse(MapPopulator.canKeyAndValueTypesByteSizesBeDetermined(ImmutableMap.of(), null));
}
@Test
public void test_populateAndWarnAtByteLimit_empty_succeeds()
{
Set<Pair<String, String>> pairs = ImmutableSet.of();
Map<String, String> map = new HashMap<>();
MapPopulator.PopulateResult result =
MapPopulator.populateAndWarnAtByteLimit(pairs.iterator(), map, -1, "");
Assert.assertTrue(map.isEmpty());
Assert.assertEquals(0, result.getEntries());
Assert.assertEquals(0, result.getLines());
Assert.assertEquals(0L, result.getBytes());
}
@Test
public void test_populateAndWarnAtByteLimit_pairTypesBytesComputableAndNoByteLimit_succeeds()
{
Set<Pair<Object, Object>> pairs = new HashSet<>();
pairs.add(new Pair<>("key1", "val1"));
pairs.add(new Pair<>("key2", "val2"));
pairs.add(new Pair<>(null, null));
pairs.add(null);
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("key1", "val1");
expectedMap.put("key2", "val2");
expectedMap.put(null, null);
Map<Object, Object> map = new HashMap<>();
MapPopulator.PopulateResult result =
MapPopulator.populateAndWarnAtByteLimit(pairs.iterator(), map, -1, null);
Assert.assertEquals(expectedMap, map);
Assert.assertEquals(4, result.getEntries());
Assert.assertEquals(0, result.getLines());
Assert.assertEquals(0L, result.getBytes());
}
@Test
public void test_populateAndWarnAtByteLimit_pairTypesBytesComputableAndByteLimit_succeeds()
{
Set<Pair<Object, Object>> pairs = new HashSet<>();
pairs.add(new Pair<>("key1", "val1"));
pairs.add(new Pair<>("key2", "val2"));
pairs.add(new Pair<>(null, null));
pairs.add(null);
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("key1", "val1");
expectedMap.put("key2", "val2");
expectedMap.put(null, null);
Map<Object, Object> map = new HashMap<>();
MapPopulator.PopulateResult result =
MapPopulator.populateAndWarnAtByteLimit(pairs.iterator(), map, 10, null);
Assert.assertEquals(expectedMap, map);
Assert.assertEquals(4, result.getEntries());
Assert.assertEquals(0, result.getLines());
Assert.assertEquals(16L, result.getBytes());
}
@Test
public void test_populateAndWarnAtByteLimit_pairTypesBytesNotComputableAndByteLimit_succeeds()
{
Set<Pair<Object, Object>> pairs = new HashSet<>();
pairs.add(new Pair<>(ImmutableSet.of(1), ImmutableSet.of(2)));
pairs.add(new Pair<>(ImmutableSet.of(3), ImmutableSet.of(4)));
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put(ImmutableSet.of(1), ImmutableSet.of(2));
expectedMap.put(ImmutableSet.of(3), ImmutableSet.of(4));
Map<Object, Object> map = new HashMap<>();
MapPopulator.PopulateResult result =
MapPopulator.populateAndWarnAtByteLimit(pairs.iterator(), map, 10, null);
Assert.assertEquals(expectedMap, map);
Assert.assertEquals(2, result.getEntries());
Assert.assertEquals(0, result.getLines());
Assert.assertEquals(0L, result.getBytes());
}
}

View File

@ -125,6 +125,7 @@ public class NamespaceLookupExtractorFactoryTest
new UriExtractionNamespace.ObjectMapperFlatDataParser(mapper), new UriExtractionNamespace.ObjectMapperFlatDataParser(mapper),
Period.millis(0), Period.millis(0),
null,
null null
); );
final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory( final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
@ -457,7 +458,7 @@ public class NamespaceLookupExtractorFactoryTest
final Injector injector = makeInjector(); final Injector injector = makeInjector();
final ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); final ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);
final String str = "{ \"type\": \"cachedNamespace\", \"extractionNamespace\": { \"type\": \"uri\", \"uriPrefix\": \"s3://bucket/prefix/\", \"fileRegex\": \"foo.*\\\\.gz\", \"namespaceParseSpec\": { \"format\": \"customJson\", \"keyFieldName\": \"someKey\", \"valueFieldName\": \"someVal\" }, \"pollPeriod\": \"PT5M\" } } }"; final String str = "{ \"type\": \"cachedNamespace\", \"extractionNamespace\": { \"type\": \"uri\", \"uriPrefix\": \"s3://bucket/prefix/\", \"fileRegex\": \"foo.*\\\\.gz\", \"namespaceParseSpec\": { \"format\": \"customJson\", \"keyFieldName\": \"someKey\", \"valueFieldName\": \"someVal\" }, \"pollPeriod\": \"PT5M\", \"maxHeapPercentage\": 10 } } }";
final LookupExtractorFactory factory = mapper.readValue(str, LookupExtractorFactory.class); final LookupExtractorFactory factory = mapper.readValue(str, LookupExtractorFactory.class);
Assert.assertTrue(factory instanceof NamespaceLookupExtractorFactory); Assert.assertTrue(factory instanceof NamespaceLookupExtractorFactory);
final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = (NamespaceLookupExtractorFactory) factory; final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = (NamespaceLookupExtractorFactory) factory;

View File

@ -101,6 +101,42 @@ public class JSONFlatDataParserTest
Assert.assertEquals(VAL2, map.get(KEY2)); Assert.assertEquals(VAL2, map.get(KEY2));
} }
@Test
public void testSimpleParseWithByteLimit_limitNotReached() throws Exception
{
final UriExtractionNamespace.JSONFlatDataParser parser = new UriExtractionNamespace.JSONFlatDataParser(
MAPPER,
"key",
"val"
);
final Map<String, String> map = new HashMap<>();
new MapPopulator<>(parser.getParser()).populateAndWarnAtByteLimit(
Files.asByteSource(tmpFile),
map,
100_000L,
"namespace");
Assert.assertEquals(VAL1, map.get(KEY1));
Assert.assertEquals(VAL2, map.get(KEY2));
}
@Test
public void testSimpleParseWithByteLimit_limitReached() throws Exception
{
final UriExtractionNamespace.JSONFlatDataParser parser = new UriExtractionNamespace.JSONFlatDataParser(
MAPPER,
"key",
"val"
);
final Map<String, String> map = new HashMap<>();
new MapPopulator<>(parser.getParser()).populateAndWarnAtByteLimit(
Files.asByteSource(tmpFile),
map,
1L,
"namespace");
Assert.assertEquals(VAL1, map.get(KEY1));
Assert.assertEquals(VAL2, map.get(KEY2));
}
@Test @Test
public void testParseWithNullValues() throws Exception public void testParseWithNullValues() throws Exception
{ {

View File

@ -62,6 +62,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -99,6 +100,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -134,6 +136,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -171,6 +174,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -212,6 +216,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -249,6 +254,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
10L,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -284,6 +290,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -321,6 +328,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -364,6 +372,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override
@ -405,6 +414,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
TS_COLUMN, TS_COLUMN,
"some filter", "some filter",
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
{ {
@Override @Override

View File

@ -129,6 +129,7 @@ public class JdbcCacheGeneratorTest
tsColumn, tsColumn,
"filter", "filter",
Period.ZERO, Period.ZERO,
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
); );
} }

View File

@ -105,6 +105,7 @@ public class NamespacedExtractorModuleTest
UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper()) UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper())
), ),
new Period(0), new Period(0),
null,
null null
); );
CacheScheduler.VersionedCache versionedCache = factory.generateCache(namespace, null, null, scheduler); CacheScheduler.VersionedCache versionedCache = factory.generateCache(namespace, null, null, scheduler);
@ -126,6 +127,7 @@ public class NamespacedExtractorModuleTest
null, null, null, null,
new UriExtractionNamespace.ObjectMapperFlatDataParser(UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper())), new UriExtractionNamespace.ObjectMapperFlatDataParser(UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper())),
new Period(0), new Period(0),
null,
null null
); );
try (CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 1_000)) { try (CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 1_000)) {
@ -149,6 +151,7 @@ public class NamespacedExtractorModuleTest
UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper()) UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper())
), ),
new Period(0), new Period(0),
null,
null null
); );
try (CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 1_000)) { try (CacheScheduler.Entry entry = scheduler.scheduleAndWait(namespace, 1_000)) {
@ -170,6 +173,7 @@ public class NamespacedExtractorModuleTest
UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper()) UriExtractionNamespaceTest.registerTypes(new DefaultObjectMapper())
), ),
new Period(0), new Period(0),
null,
null null
); );
Assert.assertEquals(0, scheduler.getActiveEntries()); Assert.assertEquals(0, scheduler.getActiveEntries());

View File

@ -294,6 +294,7 @@ public class UriCacheGeneratorTest
UriExtractionNamespaceTest.registerTypes(new ObjectMapper()) UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
), ),
new Period(0), new Period(0),
null,
null null
); );
} }
@ -324,6 +325,7 @@ public class UriCacheGeneratorTest
Pattern.quote(Paths.get(this.namespace.getUri()).getFileName().toString()), Pattern.quote(Paths.get(this.namespace.getUri()).getFileName().toString()),
this.namespace.getNamespaceParseSpec(), this.namespace.getNamespaceParseSpec(),
Period.millis((int) this.namespace.getPollMs()), Period.millis((int) this.namespace.getPollMs()),
null,
null null
); );
CacheScheduler.Entry entry = scheduler.schedule(namespace); CacheScheduler.Entry entry = scheduler.schedule(namespace);
@ -347,6 +349,7 @@ public class UriCacheGeneratorTest
UriExtractionNamespaceTest.registerTypes(new ObjectMapper()) UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
), ),
new Period(0), new Period(0),
null,
null null
); );
@ -388,6 +391,7 @@ public class UriCacheGeneratorTest
null, null, null, null,
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null,
null null
); );
Assert.assertTrue(new File(namespace.getUri()).delete()); Assert.assertTrue(new File(namespace.getUri()).delete());
@ -403,6 +407,7 @@ public class UriCacheGeneratorTest
Pattern.quote(Paths.get(namespace.getUri()).getFileName().toString()), Pattern.quote(Paths.get(namespace.getUri()).getFileName().toString()),
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null,
null null
); );
Assert.assertTrue(new File(namespace.getUri()).delete()); Assert.assertTrue(new File(namespace.getUri()).delete());
@ -418,6 +423,7 @@ public class UriCacheGeneratorTest
null, null,
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null,
null null
); );
} }
@ -431,6 +437,7 @@ public class UriCacheGeneratorTest
"", "",
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null,
null null
); );
} }
@ -444,7 +451,8 @@ public class UriCacheGeneratorTest
null, null,
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
"" "",
null
); );
} }
@ -457,11 +465,11 @@ public class UriCacheGeneratorTest
"", "",
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
"" "",
null
); );
} }
@Test(expected = IAE.class) @Test(expected = IAE.class)
public void testBadPattern() public void testBadPattern()
{ {
@ -471,6 +479,7 @@ public class UriCacheGeneratorTest
"[", "[",
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null,
null null
); );
} }
@ -492,6 +501,7 @@ public class UriCacheGeneratorTest
null, null,
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null,
null null
); );
Assert.assertNotNull(generator.generateCache(extractionNamespace, null, null, scheduler)); Assert.assertNotNull(generator.generateCache(extractionNamespace, null, null, scheduler));
@ -511,6 +521,7 @@ public class UriCacheGeneratorTest
"val" "val"
), ),
Period.millis(10000), Period.millis(10000),
null,
null null
), ),
500 500

View File

@ -188,6 +188,7 @@ public class CacheSchedulerTest
UriExtractionNamespaceTest.registerTypes(new ObjectMapper()) UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
), ),
new Period(0), new Period(0),
null,
null null
); );
CacheScheduler.Entry entry = scheduler.schedule(namespace); CacheScheduler.Entry entry = scheduler.schedule(namespace);
@ -349,6 +350,7 @@ public class CacheSchedulerTest
UriExtractionNamespaceTest.registerTypes(new ObjectMapper()) UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
), ),
new Period(period), new Period(period),
null,
null null
); );
} }

View File

@ -381,6 +381,7 @@ public class JdbcExtractionNamespaceTest
tsColumn, tsColumn,
null, null,
new Period(0), new Period(0),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
); );
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
@ -413,6 +414,7 @@ public class JdbcExtractionNamespaceTest
tsColumn, tsColumn,
FILTER_COLUMN + "='1'", FILTER_COLUMN + "='1'",
new Period(0), new Period(0),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
); );
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) { try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
@ -487,6 +489,7 @@ public class JdbcExtractionNamespaceTest
tsColumn, tsColumn,
"some filter", "some filter",
new Period(10), new Period(10),
null,
securityConfig securityConfig
); );
final ObjectMapper mapper = new DefaultObjectMapper(); final ObjectMapper mapper = new DefaultObjectMapper();
@ -511,6 +514,7 @@ public class JdbcExtractionNamespaceTest
tsColumn, tsColumn,
null, null,
new Period(10), new Period(10),
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
); );
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace); CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);