sql firehose and firehose doc adjustments (#8067)

* firehose doc adjustments

* fix typo

* additional information on parser types in ingestion docs

* clarify ingest segment firehose docs, add sql firehose examples to sql extension pages

* fixit

* make sql firehose more forgiving my always constructing a MapInputRowParser from the parseSpec of whatever actual InputRowParser impl is provided, remove doc references to map based parsers

* transforms

* fix tests
This commit is contained in:
Clint Wylie 2019-07-30 15:28:10 -07:00 committed by Gian Merlino
parent 4e60afc86e
commit 653b558134
15 changed files with 341 additions and 95 deletions

View File

@ -29,7 +29,7 @@ import java.util.List;
/**
* A class managing cached files used by {@link PrefetchableTextFilesFirehoseFactory}.
*/
class CacheManager<T>
public class CacheManager<T>
{
private static final Logger LOG = new Logger(CacheManager.class);
@ -44,17 +44,17 @@ class CacheManager<T>
private long totalCachedBytes;
CacheManager(long maxCacheCapacityBytes)
public CacheManager(long maxCacheCapacityBytes)
{
this.maxCacheCapacityBytes = maxCacheCapacityBytes;
}
boolean isEnabled()
public boolean isEnabled()
{
return maxCacheCapacityBytes > 0;
}
boolean cacheable()
public boolean cacheable()
{
// maxCacheCapacityBytes is a rough limit, so if totalCachedBytes is larger than it, no more caching is
// allowed.
@ -90,7 +90,7 @@ class CacheManager<T>
return totalCachedBytes;
}
long getMaxCacheCapacityBytes()
public long getMaxCacheCapacityBytes()
{
return maxCacheCapacityBytes;
}

View File

@ -41,7 +41,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
* A file fetcher used by {@link PrefetchableTextFilesFirehoseFactory} and {@link PrefetchSqlFirehoseFactory}.
* A file fetcher used by {@link PrefetchableTextFilesFirehoseFactory} and PrefetchSqlFirehoseFactory (in druid-server).
* See the javadoc of {@link PrefetchableTextFilesFirehoseFactory} for more details.
*/
public abstract class Fetcher<T> implements Iterator<OpenedObject<T>>
@ -72,7 +72,7 @@ public abstract class Fetcher<T> implements Iterator<OpenedObject<T>>
private int numRemainingObjects;
Fetcher(
public Fetcher(
CacheManager<T> cacheManager,
List<T> objects,
ExecutorService fetchExecutor,

View File

@ -23,7 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
interface ObjectOpenFunction<T>
public interface ObjectOpenFunction<T>
{
InputStream open(T object) throws IOException;

View File

@ -33,7 +33,7 @@ import java.io.InputStream;
* {@link PrefetchableTextFilesFirehoseFactory.ResourceCloseableLineIterator} consumes the objectStream and closes
* it with the resourceCloser.
*/
class OpenedObject<T>
public class OpenedObject<T>
{
// Original object
private final T object;
@ -42,29 +42,29 @@ class OpenedObject<T>
// Closer which is called when the file is not needed anymore. Usually this deletes the file except for cached files.
private final Closeable resourceCloser;
OpenedObject(FetchedFile<T> fetchedFile) throws IOException
public OpenedObject(FetchedFile<T> fetchedFile) throws IOException
{
this(fetchedFile.getObject(), FileUtils.openInputStream(fetchedFile.getFile()), fetchedFile.getResourceCloser());
}
OpenedObject(T object, InputStream objectStream, Closeable resourceCloser)
public OpenedObject(T object, InputStream objectStream, Closeable resourceCloser)
{
this.object = object;
this.objectStream = objectStream;
this.resourceCloser = resourceCloser;
}
T getObject()
public T getObject()
{
return object;
}
InputStream getObjectStream()
public InputStream getObjectStream()
{
return objectStream;
}
Closeable getResourceCloser()
public Closeable getResourceCloser()
{
return resourceCloser;
}

View File

@ -107,3 +107,71 @@ Copy or symlink this file to `extensions/mysql-metadata-storage` under the distr
|`druid.metadata.mysql.ssl.trustCertificateKeyStorePassword`|The [Password Provider](../../operations/password-provider.html) or String password for the trust store.|none|yes if `verifyServerCertificate` is set to true and password is not null|
|`druid.metadata.mysql.ssl.enabledSSLCipherSuites`|Overrides the existing cipher suites with these cipher suites.|none|no|
|`druid.metadata.mysql.ssl.enabledTLSProtocols`|Overrides the TLS protocols with these protocols.|none|no|
### MySQL Firehose
The MySQL extension provides an implementation of an [SqlFirehose](../../ingestion/firehose.html#SqlFirehose) which can be used to ingest data into Druid from a MySQL database.
```json
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "some_datasource",
"parser": {
"parseSpec": {
"format": "timeAndDims",
"dimensionsSpec": {
"dimensionExclusions": [],
"dimensions": [
"dim1",
"dim2",
"dim3"
]
},
"timestampSpec": {
"format": "auto",
"column": "ts"
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "sql",
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://some-rds-host.us-west-1.rds.amazonaws.com:3306/druid",
"user": "admin",
"password": "secret"
}
},
"sqls": [
"SELECT * FROM some_table"
]
}
},
"tuningconfig": {
"type": "index",
"maxNumSubTasks": 1
}
}
}
```

View File

@ -69,6 +69,7 @@ To use this Apache Druid (incubating) extension, make sure to [include](../../op
```
## Configuration
In most cases, the configuration options map directly to the [postgres jdbc connection options](https://jdbc.postgresql.org/documentation/head/connect.html).
|Property|Description|Default|Required|
@ -85,3 +86,69 @@ In most cases, the configuration options map directly to the [postgres jdbc conn
| `druid.metadata.postgres.ssl.sslPasswordCallback` | The classname of the SSL password provider. | none | no |
| `druid.metadata.postgres.dbTableSchema` | druid meta table schema | `public` | no |
### PostgreSQL Firehose
The PostgreSQL extension provides an implementation of an [SqlFirehose](../../ingestion/firehose.html#SqlFirehose) which can be used to ingest data into Druid from a PostgreSQL database.
```json
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "some_datasource",
"parser": {
"parseSpec": {
"format": "timeAndDims",
"dimensionsSpec": {
"dimensionExclusions": [],
"dimensions": [
"dim1",
"dim2",
"dim3"
]
},
"timestampSpec": {
"format": "auto",
"column": "ts"
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "sql",
"database": {
"type": "postgresql",
"connectorConfig": {
"connectURI": "jdbc:postgresql://some-rds-host.us-west-1.rds.amazonaws.com:5432/druid",
"user": "admin",
"password": "secret"
}
},
"sqls": [
"SELECT * FROM some_table"
]
}
},
"tuningconfig": {
"type": "index",
"maxNumSubTasks": 1
}
}
}
```

View File

@ -40,8 +40,7 @@ For additional Firehoses, please see our [extensions list](../development/extens
### LocalFirehose
This Firehose can be used to read the data from files on local disk.
It can be used for POCs to ingest data on disk.
This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers.
This Firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
A sample local Firehose spec is shown below:
@ -62,7 +61,7 @@ A sample local Firehose spec is shown below:
### HttpFirehose
This Firehose can be used to read the data from remote sites via HTTP.
This Firehose can be used to read the data from remote sites via HTTP, and works with `string` typed parsers.
This Firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
A sample HTTP Firehose spec is shown below:
@ -107,7 +106,7 @@ You can also use the other existing Druid PasswordProviders. Here is an example
}
```
The below configurations can be optionally used for tuning the Firehose performance.
The below configurations can optionally be used for tuning the Firehose performance.
|property|description|default|
|--------|-----------|-------|
@ -119,10 +118,10 @@ The below configurations can be optionally used for tuning the Firehose performa
### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments.
It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
This Firehose can be used to read the data from existing druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment.
This Firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
A sample ingest Firehose spec is shown below:
This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification.
A sample ingest Firehose spec is shown below:
```json
{
@ -144,12 +143,16 @@ A sample ingest Firehose spec is shown below:
### SqlFirehose
This Firehose can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes.
This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec.
For each query, the results are fetched locally and indexed.
If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to `maxFetchCapacityBytes` bytes.
This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples.
Requires one of the following extensions:
* [MySQL Metadata Store](../development/extensions-core/mysql.html).
* [PostgreSQL Metadata Store](../development/extensions-core/postgresql.html).
```json
{
"type": "sql",
@ -176,17 +179,18 @@ Requires one of the following extensions:
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|
### Database
#### Database
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes|
|connectorConfig|Specify the database connection properties via `connectURI`, `user` and `password`||Yes|
### InlineFirehose
This Firehose can be used to read the data inlined in its own spec.
It can be used for demos or for quickly testing out parsing and schema.
It can be used for demos or for quickly testing out parsing and schema, and works with `string` typed parsers.
A sample inline Firehose spec is shown below:
```json
@ -204,7 +208,6 @@ A sample inline Firehose spec is shown below:
### CombiningFirehose
This Firehose can be used to combine and merge data from a list of different Firehoses.
This can be used to merge data from more than one Firehose.
```json
{
@ -221,11 +224,12 @@ This can be used to merge data from more than one Firehose.
### Streaming Firehoses
The EventReceiverFirehose is used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). These Firehoses are not suitable for batch ingestion.
The EventReceiverFirehose is used in tasks automatically generated by
[Tranquility stream push](../ingestion/stream-push.html). These Firehoses are not suitable for batch ingestion.
#### EventReceiverFirehose
This Firehose can be used to ingest events using an HTTP endpoint.
This Firehose can be used to ingest events using an HTTP endpoint, and works with `string` typed parsers.
```json
{

View File

@ -122,7 +122,7 @@ An example dataSchema is shown below:
## Parser
If `type` is not included, the parser defaults to `string`. For additional data formats, please see our [extensions list](../development/extensions.html).
The default parser type is `string`, though a handful of extensions provide additional parser types. `string` typed parsers operate on text based inputs that can be split into individual records by newlines. For additional data formats, please see our [extensions list](../development/extensions.html).
### String Parser

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.data.input.impl.prefetch;
package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
@ -29,7 +29,12 @@ import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.SqlFirehose;
import org.apache.druid.data.input.impl.prefetch.CacheManager;
import org.apache.druid.data.input.impl.prefetch.Fetcher;
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.data.input.impl.prefetch.OpenedObject;
import org.apache.druid.data.input.impl.prefetch.PrefetchConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

View File

@ -17,7 +17,13 @@
* under the License.
*/
package org.apache.druid.data.input.impl.prefetch;
package org.apache.druid.segment.realtime.firehose;
import org.apache.druid.data.input.impl.prefetch.CacheManager;
import org.apache.druid.data.input.impl.prefetch.Fetcher;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.data.input.impl.prefetch.OpenedObject;
import org.apache.druid.data.input.impl.prefetch.PrefetchConfig;
import javax.annotation.Nullable;
import java.io.File;
@ -31,7 +37,6 @@ import java.util.concurrent.ExecutorService;
* See the javadoc of {@link PrefetchSqlFirehoseFactory} for more details.
*/
public class SqlFetcher<T> extends Fetcher<T>
{
private static final String FETCH_FILE_PREFIX = "sqlfetch-";

View File

@ -17,13 +17,16 @@
* under the License.
*/
package org.apache.druid.data.input.impl;
package org.apache.druid.segment.realtime.firehose;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
@ -35,18 +38,22 @@ import java.util.Map;
public class SqlFirehose implements Firehose
{
private final Iterator<JsonIterator<Map<String, Object>>> resultIterator;
private final InputRowParser parser;
private final MapInputRowParser parser;
private final Closeable closer;
@Nullable
private JsonIterator<Map<String, Object>> lineIterator = null;
private final Transformer transformer;
public SqlFirehose(
Iterator lineIterators,
InputRowParser<Map<String, Object>> parser,
Iterator<JsonIterator<Map<String, Object>>> lineIterators,
InputRowParser<?> parser,
Closeable closer
)
{
this.resultIterator = lineIterators;
this.parser = parser;
this.parser = new MapInputRowParser(parser.getParseSpec());
// transformer is created from the original decorated parser (which should always be decorated)
this.transformer = TransformSpec.fromInputRowParser(parser).toTransformer();
this.closer = closer;
}
@ -64,18 +71,18 @@ public class SqlFirehose implements Firehose
@Override
public InputRow nextRow()
{
Map<String, Object> mapToParse = lineIterator.next();
return (InputRow) Iterators.getOnlyElement(parser.parseBatch(mapToParse).iterator());
assert lineIterator != null;
final Map<String, Object> mapToParse = lineIterator.next();
return transformer.transform(Iterators.getOnlyElement(parser.parseBatch(mapToParse).iterator()));
}
private JsonIterator getNextLineIterator()
private JsonIterator<Map<String, Object>> getNextLineIterator()
{
if (lineIterator != null) {
lineIterator = null;
}
final JsonIterator iterator = resultIterator.next();
return iterator;
return resultIterator.next();
}
@Override
@ -87,11 +94,9 @@ public class SqlFirehose implements Firehose
@Override
public void close() throws IOException
{
Closer firehoseCloser = Closer.create();
if (lineIterator != null) {
firehoseCloser.register(lineIterator);
lineIterator.close();
}
firehoseCloser.register(closer);
firehoseCloser.close();
closer.close();
}
}

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.impl.prefetch.PrefetchSqlFirehoseFactory;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;

View File

@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.Set;
import java.util.function.Predicate;
@ -39,7 +40,8 @@ import java.util.stream.Collectors;
public class FirehoseModuleTest
{
private static final Predicate<Class> IS_FIREHOSE_FACTORY = FirehoseFactory.class::isAssignableFrom;
private static final Predicate<Class> IS_FIREHOSE_FACTORY =
c -> FirehoseFactory.class.isAssignableFrom(c) && !Modifier.isAbstract(c.getModifiers());
@Test
public void testAllFirehoseFactorySubtypesRegistered() throws IOException

View File

@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@ -34,6 +35,7 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -41,13 +43,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
public class SqlFirehoseFactoryTest
@ -67,14 +69,16 @@ public class SqlFirehoseFactoryTest
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper mapper = TestHelper.makeSmileMapper();
private final MapInputRowParser parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")),
new ArrayList<>(),
new ArrayList<>()
)
private final InputRowParser parser = TransformSpec.NONE.decorate(
new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")),
new ArrayList<>(),
new ArrayList<>()
)
)
)
);
private TestDerbyConnector derbyConnector;
@ -100,18 +104,9 @@ public class SqlFirehoseFactoryTest
private void assertResult(List<Row> rows, List<String> sqls)
{
Assert.assertEquals(10 * sqls.size(), rows.size());
rows.sort((r1, r2) -> {
int c = r1.getTimestamp().compareTo(r2.getTimestamp());
if (c != 0) {
return c;
}
c = Integer.valueOf(r1.getDimension("a").get(0)).compareTo(Integer.valueOf(r2.getDimension("a").get(0)));
if (c != 0) {
return c;
}
return Integer.valueOf(r1.getDimension("b").get(0)).compareTo(Integer.valueOf(r2.getDimension("b").get(0)));
});
rows.sort(Comparator.comparing(Row::getTimestamp)
.thenComparingInt(r -> Integer.valueOf(r.getDimension("a").get(0)))
.thenComparingInt(r -> Integer.valueOf(r.getDimension("b").get(0))));
int rowCount = 0;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < sqls.size(); j++) {
@ -147,15 +142,10 @@ public class SqlFirehoseFactoryTest
private void dropTable(final String tableName)
{
derbyConnector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute();
return null;
}
(HandleCallback<Void>) handle -> {
handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute();
return null;
}
);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.data.input.impl;
package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
@ -27,7 +27,17 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.TransformingStringInputRowParser;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -38,6 +48,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -45,12 +56,15 @@ import java.util.stream.Collectors;
public class SqlFirehoseTest
{
private static final TypeReference<Map<String, Object>> TYPE_REF = new TypeReference<Map<String, Object>>()
{
};
private static File TEST_DIR;
private List<Map<String, Object>> inputs;
private List<FileInputStream> fileList;
private MapInputRowParser parser = null;
private InputRowParser parser;
private ObjectMapper objectMapper;
private static File TEST_DIR;
@Before
public void setup() throws IOException
@ -79,10 +93,12 @@ public class SqlFirehoseTest
}
this.fileList = testFile;
parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null)
parser = TransformSpec.NONE.decorate(
new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null)
)
)
);
@ -97,11 +113,10 @@ public class SqlFirehoseTest
for (Map<String, Object> map : inputs) {
expectedResults.add(map.get("x"));
}
final List<JsonIterator> lineIterators = fileList.stream()
.map(s -> new JsonIterator(new TypeReference<Map<String, Object>>()
{
}, s, closeable, objectMapper))
.collect(Collectors.toList());
final List<JsonIterator<Map<String, Object>>> lineIterators =
fileList.stream()
.map(s -> new JsonIterator<Map<String, Object>>(TYPE_REF, s, closeable, objectMapper))
.collect(Collectors.toList());
try (final SqlFirehose firehose = new SqlFirehose(lineIterators.iterator(), parser, closeable)) {
final List<Object> results = new ArrayList<>();
@ -119,6 +134,89 @@ public class SqlFirehoseTest
}
}
@Test
public void testFirehoseStringParser() throws Exception
{
final TestCloseable closeable = new TestCloseable();
List<Object> expectedResults = new ArrayList<>();
for (Map<String, Object> map : inputs) {
expectedResults.add(map.get("x"));
}
final List<JsonIterator<Map<String, Object>>> lineIterators =
fileList.stream()
.map(s -> new JsonIterator<Map<String, Object>>(TYPE_REF, s, closeable, objectMapper))
.collect(Collectors.toList());
final InputRowParser stringParser = TransformSpec.NONE.decorate(
new StringInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null)
),
Charset.defaultCharset().name()
)
);
try (final SqlFirehose firehose = new SqlFirehose(lineIterators.iterator(), stringParser, closeable)) {
final List<Object> results = new ArrayList<>();
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
results.add(null);
} else {
results.add(inputRow.getDimension("x").get(0));
}
}
Assert.assertEquals(expectedResults, results);
}
}
@Test
public void testFirehoseTransformingParser() throws Exception
{
final TestCloseable closeable = new TestCloseable();
List<Object> expectedResults = new ArrayList<>();
for (Map<String, Object> map : inputs) {
expectedResults.add(map.get("x") + "foo");
}
final List<JsonIterator<Map<String, Object>>> lineIterators =
fileList.stream()
.map(s -> new JsonIterator<Map<String, Object>>(TYPE_REF, s, closeable, objectMapper))
.collect(Collectors.toList());
final InputRowParser stringParser = new TransformingStringInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null)
),
Charset.defaultCharset().name(),
new TransformSpec(
null,
ImmutableList.of(
new ExpressionTransform("xfoo", "concat(x,'foo')", ExprMacroTable.nil())
)
)
);
try (final SqlFirehose firehose = new SqlFirehose(lineIterators.iterator(), stringParser, closeable)) {
final List<Object> results = new ArrayList<>();
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
results.add(null);
} else {
results.add(inputRow.getDimension("xfoo").get(0));
}
}
Assert.assertEquals(expectedResults, results);
}
}
@Test
public void testClose() throws IOException
{
@ -131,9 +229,12 @@ public class SqlFirehoseTest
jg.close();
}
final JsonIterator<Map<String, Object>> jsonIterator = new JsonIterator(new TypeReference<Map<String, Object>>()
{
}, new FileInputStream(file), closeable, objectMapper);
final JsonIterator<Map<String, Object>> jsonIterator = new JsonIterator<>(
TYPE_REF,
new FileInputStream(file),
closeable,
objectMapper
);
final SqlFirehose firehose = new SqlFirehose(
ImmutableList.of(jsonIterator).iterator(),