Local indexing from RDBMS (#5441)

* Local indexing from RDBMS

*  Fix content

* Remove pom changes

* Remove extraneous space

* Add tests and update documentation

* Fix comments

* Fix docs

*  Fix build related issue

*  Handle invalid strings

* Make target database independent of metadata storage

* Add firehose connector

* Fix accessibility

* Add docs

* Remove unused def

* Remove lazy instantiation of jsoniterator

* Move unused changes

* Move unused changes

* Fix build

* Make Sqlfirehose method private
This commit is contained in:
Atul Mohan 2018-05-21 22:33:01 -05:00 committed by Jihoon Son
parent c537ea56f6
commit 1b9611a60e
19 changed files with 1786 additions and 135 deletions

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl;
import com.google.common.collect.Iterators;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.prefetch.JsonIterator;
import io.druid.java.util.common.io.Closer;
import io.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
public class SqlFirehose implements Firehose
{
private final Iterator<JsonIterator<Map<String, Object>>> resultIterator;
private final InputRowParser parser;
private final Closeable closer;
private JsonIterator<Map<String, Object>> lineIterator = null;
public SqlFirehose(
Iterator lineIterators,
InputRowParser<Map<String, Object>> parser,
Closeable closer
)
{
this.resultIterator = lineIterators;
this.parser = parser;
this.closer = closer;
}
@Override
public boolean hasMore()
{
while ((lineIterator == null || !lineIterator.hasNext()) && resultIterator.hasNext()) {
lineIterator = getNextLineIterator();
}
return lineIterator != null && lineIterator.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{
Map<String, Object> mapToParse = lineIterator.next();
return (InputRow) Iterators.getOnlyElement(parser.parseBatch(mapToParse).iterator());
}
private JsonIterator getNextLineIterator()
{
if (lineIterator != null) {
lineIterator = null;
}
final JsonIterator iterator = resultIterator.next();
return iterator;
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
Closer firehoseCloser = Closer.create();
if (lineIterator != null) {
firehoseCloser.register(lineIterator);
}
firehoseCloser.register(closer);
firehoseCloser.close();
}
}

View File

@ -20,21 +20,14 @@
package io.druid.data.input.impl.prefetch; package io.druid.data.input.impl.prefetch;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.IOUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -47,15 +40,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
* A file fetcher used by {@link PrefetchableTextFilesFirehoseFactory}. * A file fetcher used by {@link PrefetchableTextFilesFirehoseFactory} and {@link PrefetchSqlFirehoseFactory}.
* See the javadoc of {@link PrefetchableTextFilesFirehoseFactory} for more details. * See the javadoc of {@link PrefetchableTextFilesFirehoseFactory} for more details.
*/ */
public class Fetcher<T> implements Iterator<OpenedObject<T>> public abstract class Fetcher<T> implements Iterator<OpenedObject<T>>
{ {
private static final Logger LOG = new Logger(Fetcher.class); private static final Logger LOG = new Logger(Fetcher.class);
private static final String FETCH_FILE_PREFIX = "fetch-"; private static final String FETCH_FILE_PREFIX = "fetch-";
private static final int BUFFER_SIZE = 1024 * 4;
private final CacheManager<T> cacheManager; private final CacheManager<T> cacheManager;
private final List<T> objects; private final List<T> objects;
private final ExecutorService fetchExecutor; private final ExecutorService fetchExecutor;
@ -63,34 +54,16 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
@Nullable @Nullable
private final File temporaryDirectory; private final File temporaryDirectory;
// A roughly max size of total fetched objects, but the actual fetched size can be bigger. The reason is our current
// client implementations for cloud storages like s3 don't support range scan yet, so we must download the whole file
// at once. It's still possible for the size of cached/fetched data to not exceed these variables by estimating the
// after-fetch size, but it makes us consider the case when any files cannot be fetched due to their large size, which
// makes the implementation complicated.
private final long maxFetchCapacityBytes;
private final boolean prefetchEnabled; private final boolean prefetchEnabled;
private final long prefetchTriggerBytes;
// timeout for fetching an object from the remote site
private final long fetchTimeout;
// maximum retry for fetching an object from the remote site
private final int maxFetchRetry;
private final LinkedBlockingQueue<FetchedFile<T>> fetchedFiles = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<FetchedFile<T>> fetchedFiles = new LinkedBlockingQueue<>();
// Number of bytes of current fetched files. // Number of bytes of current fetched files.
// This is updated when a file is successfully fetched, a fetched file is deleted, or a fetched file is // This is updated when a file is successfully fetched, a fetched file is deleted, or a fetched file is
// cached. // cached.
private final AtomicLong fetchedBytes = new AtomicLong(0); private final AtomicLong fetchedBytes = new AtomicLong(0);
private final ObjectOpenFunction<T> openObjectFunction;
private final Predicate<Throwable> retryCondition;
private final byte[] buffer;
private Future<Void> fetchFuture; private Future<Void> fetchFuture;
private PrefetchConfig prefetchConfig;
// nextFetchIndex indicates which object should be downloaded when fetch is triggered. // nextFetchIndex indicates which object should be downloaded when fetch is triggered.
// This variable is always read by the same thread regardless of prefetch is enabled or not. // This variable is always read by the same thread regardless of prefetch is enabled or not.
@ -103,49 +76,35 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
List<T> objects, List<T> objects,
ExecutorService fetchExecutor, ExecutorService fetchExecutor,
@Nullable File temporaryDirectory, @Nullable File temporaryDirectory,
long maxFetchCapacityBytes, PrefetchConfig prefetchConfig
long prefetchTriggerBytes,
long fetchTimeout,
int maxFetchRetry,
ObjectOpenFunction<T> openObjectFunction,
Predicate<Throwable> retryCondition
) )
{ {
this.cacheManager = cacheManager; this.cacheManager = cacheManager;
this.objects = objects; this.objects = objects;
this.fetchExecutor = fetchExecutor; this.fetchExecutor = fetchExecutor;
this.temporaryDirectory = temporaryDirectory; this.temporaryDirectory = temporaryDirectory;
this.maxFetchCapacityBytes = maxFetchCapacityBytes; this.prefetchConfig = prefetchConfig;
this.prefetchTriggerBytes = prefetchTriggerBytes; this.prefetchEnabled = prefetchConfig.getMaxFetchCapacityBytes() > 0;
this.fetchTimeout = fetchTimeout;
this.maxFetchRetry = maxFetchRetry;
this.openObjectFunction = openObjectFunction;
this.retryCondition = retryCondition;
this.buffer = new byte[BUFFER_SIZE];
this.prefetchEnabled = maxFetchCapacityBytes > 0;
this.numRemainingObjects = objects.size(); this.numRemainingObjects = objects.size();
// (*) If cache is initialized, put all cached files to the queue. // (*) If cache is initialized, put all cached files to the queue.
this.fetchedFiles.addAll(cacheManager.getFiles()); this.fetchedFiles.addAll(cacheManager.getFiles());
this.nextFetchIndex = fetchedFiles.size(); this.nextFetchIndex = fetchedFiles.size();
if (cacheManager.isEnabled() || prefetchEnabled) { if (cacheManager.isEnabled() || prefetchEnabled) {
Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory"); Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory");
} }
if (prefetchEnabled) { if (prefetchEnabled) {
fetchIfNeeded(0L); fetchIfNeeded(0L);
} }
} }
/** /**
* Submit a fetch task if remainingBytes is smaller than {@link #prefetchTriggerBytes}. * Submit a fetch task if remainingBytes is smaller than prefetchTriggerBytes.
*/ */
private void fetchIfNeeded(long remainingBytes) private void fetchIfNeeded(long remainingBytes)
{ {
if ((fetchFuture == null || fetchFuture.isDone()) if ((fetchFuture == null || fetchFuture.isDone())
&& remainingBytes <= prefetchTriggerBytes) { && remainingBytes <= prefetchConfig.getPrefetchTriggerBytes()) {
fetchFuture = fetchExecutor.submit(() -> { fetchFuture = fetchExecutor.submit(() -> {
fetch(); fetch();
return null; return null;
@ -154,18 +113,19 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
} }
/** /**
* Fetch objects to a local disk up to {@link PrefetchableTextFilesFirehoseFactory#maxFetchCapacityBytes}. * Fetch objects to a local disk up to {@link PrefetchConfig#maxFetchCapacityBytes}.
* This method is not thread safe and must be called by a single thread. Note that even * This method is not thread safe and must be called by a single thread. Note that even
* {@link PrefetchableTextFilesFirehoseFactory#maxFetchCapacityBytes} is 0, at least 1 file is always fetched. * {@link PrefetchConfig#maxFetchCapacityBytes} is 0, at least 1 file is always fetched.
* This is for simplifying design, and should be improved when our client implementations for cloud storages * This is for simplifying design, and should be improved when our client implementations for cloud storages
* like S3 support range scan. * like S3 support range scan.
* * <p>
* This method is called by {@link #fetchExecutor} if prefetch is enabled. Otherwise, it is called by the same * This method is called by {@link #fetchExecutor} if prefetch is enabled. Otherwise, it is called by the same
* thread. * thread.
*/ */
private void fetch() throws Exception private void fetch() throws Exception
{ {
for (; nextFetchIndex < objects.size() && fetchedBytes.get() <= maxFetchCapacityBytes; nextFetchIndex++) { for (; nextFetchIndex < objects.size()
&& fetchedBytes.get() <= prefetchConfig.getMaxFetchCapacityBytes(); nextFetchIndex++) {
final T object = objects.get(nextFetchIndex); final T object = objects.get(nextFetchIndex);
LOG.info("Fetching [%d]th object[%s], fetchedBytes[%d]", nextFetchIndex, object, fetchedBytes.get()); LOG.info("Fetching [%d]th object[%s], fetchedBytes[%d]", nextFetchIndex, object, fetchedBytes.get());
final File outFile = File.createTempFile(FETCH_FILE_PREFIX, null, temporaryDirectory); final File outFile = File.createTempFile(FETCH_FILE_PREFIX, null, temporaryDirectory);
@ -175,34 +135,20 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
} }
/** /**
* Downloads an object. It retries downloading {@link PrefetchableTextFilesFirehoseFactory#maxFetchRetry} * Downloads an object into a file. The download process could be retried depending on the object source.
* times and throws an exception.
* *
* @param object an object to be downloaded * @param object an object to be downloaded
* @param outFile a file which the object data is stored * @param outFile a file which the object data is stored
* *
* @return number of downloaded bytes * @return number of downloaded bytes
*/ */
private long download(T object, File outFile) throws IOException protected abstract long download(T object, File outFile) throws IOException;
{
try { /**
return RetryUtils.retry( * Generates an instance of {@link OpenedObject} for the given object.
() -> { */
try (final InputStream is = openObjectFunction.open(object); protected abstract OpenedObject<T> generateOpenObject(T object) throws IOException;
final OutputStream os = new FileOutputStream(outFile)) {
return IOUtils.copyLarge(is, os, buffer);
}
},
retryCondition,
outFile::delete,
maxFetchRetry + 1,
StringUtils.format("Failed to download object[%s]", object)
);
}
catch (Exception e) {
throw new IOException(e);
}
}
@Override @Override
public boolean hasNext() public boolean hasNext()
@ -235,7 +181,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
{ {
try { try {
if (wait) { if (wait) {
fetchFuture.get(fetchTimeout, TimeUnit.MILLISECONDS); fetchFuture.get(prefetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS);
fetchFuture = null; fetchFuture = null;
} else if (fetchFuture != null && fetchFuture.isDone()) { } else if (fetchFuture != null && fetchFuture.isDone()) {
fetchFuture.get(); fetchFuture.get();
@ -246,7 +192,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
throw new RuntimeException(e); throw new RuntimeException(e);
} }
catch (TimeoutException e) { catch (TimeoutException e) {
throw new ISE(e, "Failed to fetch, but cannot check the reason in [%d] ms", fetchTimeout); throw new ISE(e, "Failed to fetch, but cannot check the reason in [%d] ms", prefetchConfig.getFetchTimeout());
} }
} }
@ -261,7 +207,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
// Otherwise, wait for fetching // Otherwise, wait for fetching
try { try {
fetchIfNeeded(fetchedBytes.get()); fetchIfNeeded(fetchedBytes.get());
fetchedFile = fetchedFiles.poll(fetchTimeout, TimeUnit.MILLISECONDS); fetchedFile = fetchedFiles.poll(prefetchConfig.getFetchTimeout(), TimeUnit.MILLISECONDS);
if (fetchedFile == null) { if (fetchedFile == null) {
// Check the latest fetch is failed // Check the latest fetch is failed
checkFetchException(true); checkFetchException(true);
@ -304,11 +250,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
final T object = objects.get(nextFetchIndex); final T object = objects.get(nextFetchIndex);
LOG.info("Reading [%d]th object[%s]", nextFetchIndex, object); LOG.info("Reading [%d]th object[%s]", nextFetchIndex, object);
nextFetchIndex++; nextFetchIndex++;
return new OpenedObject<>( return generateOpenObject(object);
object,
new RetryingInputStream<>(object, openObjectFunction, retryCondition, maxFetchRetry),
getNoopCloser()
);
} }
} }
@ -325,11 +267,6 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
} }
} }
private static Closeable getNoopCloser()
{
return () -> {};
}
private static Closeable getFileCloser( private static Closeable getFileCloser(
final File file, final File file,
final AtomicLong fetchedBytes final AtomicLong fetchedBytes

View File

@ -0,0 +1,134 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import com.google.common.base.Predicate;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import org.apache.commons.io.IOUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* A file fetcher used by {@link PrefetchableTextFilesFirehoseFactory}.
* See the javadoc of {@link PrefetchableTextFilesFirehoseFactory} for more details.
*/
public class FileFetcher<T> extends Fetcher<T>
{
private static final int BUFFER_SIZE = 1024 * 4;
private final ObjectOpenFunction<T> openObjectFunction;
private final Predicate<Throwable> retryCondition;
private final byte[] buffer;
// maximum retry for fetching an object from the remote site
private final int maxFetchRetry;
public int getMaxFetchRetry()
{
return maxFetchRetry;
}
FileFetcher(
CacheManager<T> cacheManager,
List<T> objects,
ExecutorService fetchExecutor,
@Nullable File temporaryDirectory,
PrefetchConfig prefetchConfig,
ObjectOpenFunction<T> openObjectFunction,
Predicate<Throwable> retryCondition,
Integer maxFetchRetries
)
{
super(
cacheManager,
objects,
fetchExecutor,
temporaryDirectory,
prefetchConfig
);
this.openObjectFunction = openObjectFunction;
this.retryCondition = retryCondition;
this.buffer = new byte[BUFFER_SIZE];
this.maxFetchRetry = maxFetchRetries;
}
/**
* Downloads an object. It retries downloading {@link #maxFetchRetry}
* times and throws an exception.
*
* @param object an object to be downloaded
* @param outFile a file which the object data is stored
*
* @return number of downloaded bytes
*/
@Override
protected long download(T object, File outFile) throws IOException
{
try {
return RetryUtils.retry(
() -> {
try (final InputStream is = openObjectFunction.open(object);
final OutputStream os = new FileOutputStream(outFile)) {
return IOUtils.copyLarge(is, os, buffer);
}
},
retryCondition,
outFile::delete,
maxFetchRetry + 1,
StringUtils.format("Failed to download object[%s]", object)
);
}
catch (Exception e) {
throw new IOException(e);
}
}
/**
* Generates an instance of {@link OpenedObject} for which the underlying stream may be re-opened and retried
* based on the exception and retry condition.
*/
@Override
protected OpenedObject<T> generateOpenObject(T object) throws IOException
{
return new OpenedObject<>(
object,
new RetryingInputStream<>(object, openObjectFunction, retryCondition, getMaxFetchRetry()),
getNoopCloser()
);
}
private static Closeable getNoopCloser()
{
return () -> {
};
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* An iterator over an array of JSON objects. Uses {@link ObjectCodec} to deserialize regular Java objects.
*
* @param <T> the type of object returned by this iterator
*/
public class JsonIterator<T> implements Iterator<T>, Closeable
{
private JsonParser jp;
private ObjectCodec objectCodec;
private final TypeReference typeRef;
private final InputStream inputStream;
private final Closeable resourceCloser;
private final ObjectMapper objectMapper;
/**
* @param typeRef the object type that the JSON object should be deserialized into
* @param inputStream stream containing an array of JSON objects
* @param resourceCloser a {@code Closeable} implementation to release resources that the object is holding
* @param objectMapper object mapper, used for deserialization
*/
public JsonIterator(
TypeReference typeRef,
InputStream inputStream,
Closeable resourceCloser,
ObjectMapper objectMapper
)
{
this.typeRef = typeRef;
this.inputStream = inputStream;
this.resourceCloser = resourceCloser;
this.objectMapper = objectMapper;
init();
}
/**
* Returns {@code true} if there are more objects to be read.
*
* @return {@code true} if there are more objects to be read, else return {@code false}
*/
@Override
public boolean hasNext()
{
if (jp.isClosed()) {
return false;
}
if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
CloseQuietly.close(jp);
return false;
}
return true;
}
/**
* Retrieves the next deserialized object from the stream of JSON objects.
*
* @return the next deserialized object from the stream of JSON ovbjects
*/
@Override
public T next()
{
if (!hasNext()) {
throw new NoSuchElementException("No more objects to read!");
}
try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
return retVal;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private void init()
{
try {
if (inputStream == null) {
throw new UnsupportedOperationException();
} else {
jp = objectMapper.getFactory().createParser(inputStream);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("First token should be START_ARRAY", jp.getCurrentToken());
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException
{
Closer closer = Closer.create();
if (jp != null) {
closer.register(jp);
}
closer.register(resourceCloser);
closer.close();
}
}

View File

@ -19,6 +19,7 @@
package io.druid.data.input.impl.prefetch; package io.druid.data.input.impl.prefetch;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -26,5 +27,14 @@ interface ObjectOpenFunction<T>
{ {
InputStream open(T object) throws IOException; InputStream open(T object) throws IOException;
InputStream open(T object, long start) throws IOException; default InputStream open(T object, long start) throws IOException
{
return open(object);
}
default InputStream open(T object, File outFile) throws IOException
{
return open(object);
}
} }

View File

@ -0,0 +1,87 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import java.util.concurrent.TimeUnit;
/**
* Holds the essential configuration required by {@link Fetcher} for prefetching purposes.
*/
public class PrefetchConfig
{
public static final long DEFAULT_MAX_CACHE_CAPACITY_BYTES = 1024 * 1024 * 1024; // 1GB
public static final long DEFAULT_MAX_FETCH_CAPACITY_BYTES = 1024 * 1024 * 1024; // 1GB
public static final long DEFAULT_FETCH_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
public static final int DEFAULT_MAX_FETCH_RETRY = 3;
// A roughly max size of total fetched objects, but the actual fetched size can be bigger. The reason is our current
// client implementations for cloud storages like s3 don't support range scan yet, so we must download the whole file
// at once. It's still possible for the size of cached/fetched data to not exceed these variables by estimating the
// after-fetch size, but it makes us consider the case when any files cannot be fetched due to their large size, which
// makes the implementation complicated.
private final long maxFetchCapacityBytes;
private final long maxCacheCapacityBytes;
private final long prefetchTriggerBytes;
// timeout for fetching an object from the remote site
private final long fetchTimeout;
public PrefetchConfig(
Long maxCacheCapacityBytes,
Long maxFetchCapacityBytes,
Long prefetchTriggerBytes,
Long fetchTimeout
)
{
this.maxCacheCapacityBytes = maxCacheCapacityBytes == null
? DEFAULT_MAX_CACHE_CAPACITY_BYTES
: maxCacheCapacityBytes;
this.maxFetchCapacityBytes = maxFetchCapacityBytes == null
? DEFAULT_MAX_FETCH_CAPACITY_BYTES
: maxFetchCapacityBytes;
this.prefetchTriggerBytes = prefetchTriggerBytes == null
? this.maxFetchCapacityBytes / 2
: prefetchTriggerBytes;
this.fetchTimeout = fetchTimeout == null ? DEFAULT_FETCH_TIMEOUT_MS : fetchTimeout;
}
public long getMaxCacheCapacityBytes()
{
return maxCacheCapacityBytes;
}
public long getMaxFetchCapacityBytes()
{
return maxFetchCapacityBytes;
}
public long getPrefetchTriggerBytes()
{
return prefetchTriggerBytes;
}
public long getFetchTimeout()
{
return fetchTimeout;
}
}

View File

@ -0,0 +1,253 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.SqlFirehose;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.LineIterator;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* PrefetchSqlFirehoseFactory is an abstract firehose factory for reading prefetched sql resultset data. Regardless
* of whether prefetching is enabled or not, for each sql object the entire result set is fetched into a file in the local disk.
* This class defines prefetching as caching the resultsets into local disk in case multiple sql queries are present.
* When prefetching is enabled, the following functionalities are provided:
* <p/>
* <p>
* - Caching: for the first call of {@link #connect(InputRowParser, File)}, it caches objects in a local disk
* up to maxCacheCapacityBytes. These caches are NOT deleted until the process terminates, and thus can be used for
* future reads.
* <br/>
* - Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from
* them. For the performance reason, prefetch technique is used, that is, when the size of remaining fetched data is
* smaller than {@link PrefetchConfig#prefetchTriggerBytes}, a background prefetch thread automatically starts to fetch remaining
* objects.
* <br/>
* <p/>
* <p>
* This implementation aims to avoid maintaining a persistent connection to the database by prefetching the resultset into disk.
* <br/>
* Prefetching can be turned on/off by setting maxFetchCapacityBytes. Depending on prefetching is enabled or
* disabled, the behavior of the firehose is different like below.
* <p/>
* <p>
* 1. If prefetch is enabled this firehose can fetch input objects in background.
* <br/>
* 2. When next() is called, it first checks that there are already fetched files in local storage.
* <br/>
* 2.1 If exists, it simply chooses a fetched file and returns a {@link LineIterator} reading that file.
* <br/>
* 2.2 If there is no fetched files in local storage but some objects are still remained to be read, the firehose
* fetches one of input objects in background immediately. Finally, the firehose returns an iterator of {@link JsonIterator}
* for deserializing the saved resultset.
* <br/>
* 3. If prefetch is disabled, the firehose saves the resultset to file and returns an iterator of {@link JsonIterator}
* which directly reads the stream opened by {@link #openObjectStream}. If there is an IOException, it will throw it
* and the read will fail.
*/
public abstract class PrefetchSqlFirehoseFactory<T>
implements FirehoseFactory<InputRowParser<Map<String, Object>>>
{
private static final Logger LOG = new Logger(PrefetchSqlFirehoseFactory.class);
private final PrefetchConfig prefetchConfig;
private final CacheManager<T> cacheManager;
private List<T> objects;
private ObjectMapper objectMapper;
public PrefetchSqlFirehoseFactory(
Long maxCacheCapacityBytes,
Long maxFetchCapacityBytes,
Long prefetchTriggerBytes,
Long fetchTimeout,
ObjectMapper objectMapper
)
{
this.prefetchConfig = new PrefetchConfig(
maxCacheCapacityBytes,
maxFetchCapacityBytes,
prefetchTriggerBytes,
fetchTimeout
);
this.cacheManager = new CacheManager<>(
prefetchConfig.getMaxCacheCapacityBytes()
);
this.objectMapper = objectMapper;
}
@JsonProperty
public long getMaxCacheCapacityBytes()
{
return cacheManager.getMaxCacheCapacityBytes();
}
@JsonProperty
public long getMaxFetchCapacityBytes()
{
return prefetchConfig.getMaxFetchCapacityBytes();
}
@JsonProperty
public long getPrefetchTriggerBytes()
{
return prefetchConfig.getPrefetchTriggerBytes();
}
@JsonProperty
public long getFetchTimeout()
{
return prefetchConfig.getFetchTimeout();
}
@VisibleForTesting
CacheManager<T> getCacheManager()
{
return cacheManager;
}
@Override
public Firehose connect(InputRowParser<Map<String, Object>> firehoseParser, @Nullable File temporaryDirectory)
throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects"));
}
if (cacheManager.isEnabled() || prefetchConfig.getMaxFetchCapacityBytes() > 0) {
Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory");
Preconditions.checkArgument(
temporaryDirectory.exists(),
"temporaryDirectory[%s] does not exist",
temporaryDirectory
);
Preconditions.checkArgument(
temporaryDirectory.isDirectory(),
"temporaryDirectory[%s] is not a directory",
temporaryDirectory
);
}
LOG.info("Create a new firehose for [%d] queries", objects.size());
// fetchExecutor is responsible for background data fetching
final ExecutorService fetchExecutor = Execs.singleThreaded("firehose_fetch_%d");
final Fetcher<T> fetcher = new SqlFetcher<>(
cacheManager,
objects,
fetchExecutor,
temporaryDirectory,
prefetchConfig,
new ObjectOpenFunction<T>()
{
@Override
public InputStream open(T object, File outFile) throws IOException
{
return openObjectStream(object, outFile);
}
@Override
public InputStream open(T object) throws IOException
{
final File outFile = File.createTempFile("sqlresults_", null, temporaryDirectory);
return openObjectStream(object, outFile);
}
}
);
return new SqlFirehose(
new Iterator<JsonIterator<Map<String, Object>>>()
{
@Override
public boolean hasNext()
{
return fetcher.hasNext();
}
@Override
public JsonIterator<Map<String, Object>> next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
try {
TypeReference<Map<String, Object>> type = new TypeReference<Map<String, Object>>()
{
};
final OpenedObject<T> openedObject = fetcher.next();
final InputStream stream = openedObject.getObjectStream();
return new JsonIterator<>(type, stream, openedObject.getResourceCloser(), objectMapper);
}
catch (Exception ioe) {
throw new RuntimeException(ioe);
}
}
},
firehoseParser,
() -> {
fetchExecutor.shutdownNow();
try {
Preconditions.checkState(fetchExecutor.awaitTermination(
prefetchConfig.getFetchTimeout(),
TimeUnit.MILLISECONDS
));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ISE("Failed to shutdown fetch executor during close");
}
}
);
}
/**
* Open an input stream from the given object. The object is fetched into the file and an input
* stream to the file is provided.
*
* @param object an object to be read
* @param filename file to which the object is fetched into
*
* @return an input stream to the file
*/
protected abstract InputStream openObjectStream(T object, File filename) throws IOException;
protected abstract Collection<T> initObjects();
}

View File

@ -51,19 +51,19 @@ import java.util.concurrent.TimeUnit;
* PrefetchableTextFilesFirehoseFactory is an abstract firehose factory for reading text files. The firehose returned * PrefetchableTextFilesFirehoseFactory is an abstract firehose factory for reading text files. The firehose returned
* by this class provides three key functionalities. * by this class provides three key functionalities.
* <p/> * <p/>
* * <p>
* - Caching: for the first call of {@link #connect(StringInputRowParser, File)}, it caches objects in a local disk * - Caching: for the first call of {@link #connect(StringInputRowParser, File)}, it caches objects in a local disk
* up to maxCacheCapacityBytes. These caches are NOT deleted until the process terminates, and thus can be used for * up to maxCacheCapacityBytes. These caches are NOT deleted until the process terminates, and thus can be used for
* future reads. * future reads.
* <br/> * <br/>
* - Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from * - Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from
* them. For the performance reason, prefetch technique is used, that is, when the size of remaining fetched data is * them. For the performance reason, prefetch technique is used, that is, when the size of remaining fetched data is
* smaller than {@link #prefetchTriggerBytes}, a background prefetch thread automatically starts to fetch remaining * smaller than {@link PrefetchConfig#prefetchTriggerBytes}, a background prefetch thread automatically starts to fetch remaining
* objects. * objects.
* <br/> * <br/>
* - Retry: if an exception occurs while downloading an object, it retries again up to {@link #maxFetchRetry}. * - Retry: if an exception occurs while downloading an object, it retries again up to {@link #maxFetchRetry}.
* <p/> * <p/>
* * <p>
* This implementation can be useful when the cost for reading input objects is large as reading from AWS S3 because * This implementation can be useful when the cost for reading input objects is large as reading from AWS S3 because
* batch tasks like IndexTask or HadoopIndexTask can read the whole data twice for determining partition specs and * batch tasks like IndexTask or HadoopIndexTask can read the whole data twice for determining partition specs and
* generating segments if the intervals of GranularitySpec is not specified. * generating segments if the intervals of GranularitySpec is not specified.
@ -71,18 +71,18 @@ import java.util.concurrent.TimeUnit;
* Prefetching can be turned on/off by setting maxFetchCapacityBytes. Depending on prefetching is enabled or * Prefetching can be turned on/off by setting maxFetchCapacityBytes. Depending on prefetching is enabled or
* disabled, the behavior of the firehose is different like below. * disabled, the behavior of the firehose is different like below.
* <p/> * <p/>
* * <p>
* 1. If prefetch is enabled, this firehose can fetch input objects in background. * 1. If prefetch is enabled, this firehose can fetch input objects in background.
* <br/> * <br/>
* 2. When next() is called, it first checks that there are already fetched files in local storage. * 2. When next() is called, it first checks that there are already fetched files in local storage.
* <br/> * <br/>
* 2.1 If exists, it simply chooses a fetched file and returns a {@link LineIterator} reading that file. * 2.1 If exists, it simply chooses a fetched file and returns a {@link LineIterator} reading that file.
* <br/> * <br/>
* 2.2 If there is no fetched files in local storage but some objects are still remained to be read, the firehose * 2.2 If there is no fetched files in local storage but some objects are still remained to be read, the firehose
* fetches one of input objects in background immediately. If an IOException occurs while downloading the object, * fetches one of input objects in background immediately. If an IOException occurs while downloading the object,
* it retries up to the maximum retry count. Finally, the firehose returns a {@link LineIterator} only when the * it retries up to the maximum retry count. Finally, the firehose returns a {@link LineIterator} only when the
* download operation is successfully finished. * download operation is successfully finished.
* <br/> * <br/>
* 3. If prefetch is disabled, the firehose returns a {@link LineIterator} which directly reads the stream opened by * 3. If prefetch is disabled, the firehose returns a {@link LineIterator} which directly reads the stream opened by
* {@link #openObjectStream}. If there is an IOException, it will throw it and the read will fail. * {@link #openObjectStream}. If there is an IOException, it will throw it and the read will fail.
*/ */
@ -90,18 +90,12 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
extends AbstractTextFilesFirehoseFactory<T> extends AbstractTextFilesFirehoseFactory<T>
{ {
private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class); private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);
private static final long DEFAULT_MAX_CACHE_CAPACITY_BYTES = 1024 * 1024 * 1024; // 1GB
private static final long DEFAULT_MAX_FETCH_CAPACITY_BYTES = 1024 * 1024 * 1024; // 1GB
private static final long DEFAULT_FETCH_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
private static final int DEFAULT_MAX_FETCH_RETRY = 3;
private final CacheManager<T> cacheManager; private final CacheManager<T> cacheManager;
private final long maxFetchCapacityBytes; private final PrefetchConfig prefetchConfig;
private final long prefetchTriggerBytes;
private final long fetchTimeout;
private final int maxFetchRetry;
private List<T> objects; private List<T> objects;
private final int maxFetchRetry;
public PrefetchableTextFilesFirehoseFactory( public PrefetchableTextFilesFirehoseFactory(
Long maxCacheCapacityBytes, Long maxCacheCapacityBytes,
@ -111,17 +105,16 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
Integer maxFetchRetry Integer maxFetchRetry
) )
{ {
this.cacheManager = new CacheManager<>( this.prefetchConfig = new PrefetchConfig(
maxCacheCapacityBytes == null ? DEFAULT_MAX_CACHE_CAPACITY_BYTES : maxCacheCapacityBytes maxCacheCapacityBytes,
maxFetchCapacityBytes,
prefetchTriggerBytes,
fetchTimeout
); );
this.maxFetchCapacityBytes = maxFetchCapacityBytes == null this.cacheManager = new CacheManager<>(
? DEFAULT_MAX_FETCH_CAPACITY_BYTES prefetchConfig.getMaxCacheCapacityBytes()
: maxFetchCapacityBytes; );
this.prefetchTriggerBytes = prefetchTriggerBytes == null this.maxFetchRetry = maxFetchRetry;
? this.maxFetchCapacityBytes / 2
: prefetchTriggerBytes;
this.fetchTimeout = fetchTimeout == null ? DEFAULT_FETCH_TIMEOUT_MS : fetchTimeout;
this.maxFetchRetry = maxFetchRetry == null ? DEFAULT_MAX_FETCH_RETRY : maxFetchRetry;
} }
@JsonProperty @JsonProperty
@ -133,19 +126,19 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
@JsonProperty @JsonProperty
public long getMaxFetchCapacityBytes() public long getMaxFetchCapacityBytes()
{ {
return maxFetchCapacityBytes; return prefetchConfig.getMaxFetchCapacityBytes();
} }
@JsonProperty @JsonProperty
public long getPrefetchTriggerBytes() public long getPrefetchTriggerBytes()
{ {
return prefetchTriggerBytes; return prefetchConfig.getPrefetchTriggerBytes();
} }
@JsonProperty @JsonProperty
public long getFetchTimeout() public long getFetchTimeout()
{ {
return fetchTimeout; return prefetchConfig.getFetchTimeout();
} }
@JsonProperty @JsonProperty
@ -167,7 +160,7 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects")); objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects"));
} }
if (cacheManager.isEnabled() || maxFetchCapacityBytes > 0) { if (cacheManager.isEnabled() || prefetchConfig.getMaxFetchCapacityBytes() > 0) {
Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory"); Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory");
Preconditions.checkArgument( Preconditions.checkArgument(
temporaryDirectory.exists(), temporaryDirectory.exists(),
@ -185,15 +178,12 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
// fetchExecutor is responsible for background data fetching // fetchExecutor is responsible for background data fetching
final ExecutorService fetchExecutor = Execs.singleThreaded("firehose_fetch_%d"); final ExecutorService fetchExecutor = Execs.singleThreaded("firehose_fetch_%d");
final Fetcher<T> fetcher = new Fetcher<>( final FileFetcher<T> fetcher = new FileFetcher<T>(
cacheManager, cacheManager,
objects, objects,
fetchExecutor, fetchExecutor,
temporaryDirectory, temporaryDirectory,
maxFetchCapacityBytes, prefetchConfig,
prefetchTriggerBytes,
fetchTimeout,
maxFetchRetry,
new ObjectOpenFunction<T>() new ObjectOpenFunction<T>()
{ {
@Override @Override
@ -208,7 +198,8 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
return openObjectStream(object, start); return openObjectStream(object, start);
} }
}, },
getRetryCondition() getRetryCondition(),
getMaxFetchRetry()
); );
return new FileIteratingFirehose( return new FileIteratingFirehose(
@ -249,7 +240,10 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
() -> { () -> {
fetchExecutor.shutdownNow(); fetchExecutor.shutdownNow();
try { try {
Preconditions.checkState(fetchExecutor.awaitTermination(fetchTimeout, TimeUnit.MILLISECONDS)); Preconditions.checkState(fetchExecutor.awaitTermination(
prefetchConfig.getFetchTimeout(),
TimeUnit.MILLISECONDS
));
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* A file fetcher used by {@link PrefetchSqlFirehoseFactory}.
* See the javadoc of {@link PrefetchSqlFirehoseFactory} for more details.
*/
public class SqlFetcher<T> extends Fetcher<T>
{
private static final String FETCH_FILE_PREFIX = "sqlfetch-";
@Nullable
private final File temporaryDirectory;
private final ObjectOpenFunction<T> openObjectFunction;
SqlFetcher(
CacheManager<T> cacheManager,
List<T> objects,
ExecutorService fetchExecutor,
@Nullable File temporaryDirectory,
PrefetchConfig prefetchConfig,
ObjectOpenFunction<T> openObjectFunction
)
{
super(
cacheManager,
objects,
fetchExecutor,
temporaryDirectory,
prefetchConfig
);
this.temporaryDirectory = temporaryDirectory;
this.openObjectFunction = openObjectFunction;
}
/**
* Downloads the entire resultset object into a file. This avoids maintaining a
* persistent connection to the database. The retry is performed at the query execution layer.
*
* @param object sql query for which the resultset is to be downloaded
* @param outFile a file which the object data is stored
*
* @return size of downloaded resultset
*/
@Override
protected long download(T object, File outFile) throws IOException
{
openObjectFunction.open(object, outFile);
return outFile.length();
}
/**
* Generates an instance of {@link OpenedObject} for the given object. This is usually called
* when prefetching is disabled. The retry is performed at the query execution layer.
*/
@Override
protected OpenedObject<T> generateOpenObject(T object) throws IOException
{
final File outFile = File.createTempFile(FETCH_FILE_PREFIX, null, temporaryDirectory);
return new OpenedObject<>(
object,
openObjectFunction.open(object, outFile),
outFile::delete
);
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SqlFirehoseTest
{
private List<Map<String, Object>> inputs;
private List<FileInputStream> fileList;
private MapInputRowParser parser = null;
private ObjectMapper objectMapper;
private static File TEST_DIR;
@Before
public void setup() throws IOException
{
TEST_DIR = File.createTempFile(SqlFirehose.class.getSimpleName(), "testDir");
FileUtils.forceDelete(TEST_DIR);
FileUtils.forceMkdir(TEST_DIR);
final List<Map<String, Object>> inputTexts = ImmutableList.of(
ImmutableMap.of("x", "foostring1", "timestamp", 2000),
ImmutableMap.of("x", "foostring2", "timestamp", 2000)
);
List<FileInputStream> testFile = new ArrayList<>();
this.objectMapper = new ObjectMapper(new SmileFactory());
int i = 0;
for (Map m : inputTexts) {
File file = new File(TEST_DIR, "test_" + i++);
try (FileOutputStream fos = new FileOutputStream(file)) {
final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos);
jg.writeStartArray();
jg.writeObject(m);
jg.writeEndArray();
jg.close();
testFile.add(new FileInputStream(file));
}
}
this.fileList = testFile;
parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null)
)
);
this.inputs = inputTexts;
}
@Test
public void testFirehose() throws Exception
{
final TestCloseable closeable = new TestCloseable();
List<Object> expectedResults = new ArrayList<>();
for (Map<String, Object> map : inputs) {
expectedResults.add(map.get("x"));
}
final List<JsonIterator> lineIterators = fileList.stream()
.map(s -> new JsonIterator(new TypeReference<Map<String, Object>>()
{
}, s, closeable, objectMapper))
.collect(Collectors.toList());
try (final SqlFirehose firehose = new SqlFirehose(lineIterators.iterator(), parser, closeable)) {
final List<Object> results = Lists.newArrayList();
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
results.add(null);
} else {
results.add(inputRow.getDimension("x").get(0));
}
}
Assert.assertEquals(expectedResults, results);
}
}
@Test
public void testClose() throws IOException
{
File file = File.createTempFile("test", "", TEST_DIR);
final TestCloseable closeable = new TestCloseable();
try (FileOutputStream fos = new FileOutputStream(file)) {
final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos);
jg.writeStartArray();
jg.writeEndArray();
jg.close();
}
final JsonIterator<Map<String, Object>> jsonIterator = new JsonIterator(new TypeReference<Map<String, Object>>()
{
}, new FileInputStream(file), closeable, objectMapper);
final SqlFirehose firehose = new SqlFirehose(
ImmutableList.of(jsonIterator).iterator(),
parser,
closeable
);
firehose.hasMore(); // initialize lineIterator
firehose.close();
Assert.assertTrue(closeable.closed);
}
@After
public void teardown() throws IOException
{
FileUtils.forceDelete(TEST_DIR);
}
private static final class TestCloseable implements Closeable
{
private boolean closed;
@Override
public void close()
{
closed = true;
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class JsonIteratorTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper(new SmileFactory());
List<Map<String, Object>> expectedList = ImmutableList.of(ImmutableMap.of("key1", "value1", "key2", 2));
File testFile = File.createTempFile("testfile", "");
TypeReference<Map<String, Object>> type = new TypeReference<Map<String, Object>>()
{
};
try (FileOutputStream fos = new FileOutputStream(testFile)) {
final JsonGenerator jg = mapper.getFactory().createGenerator(fos);
jg.writeStartArray();
for (Map<String, Object> mapFromList : expectedList) {
jg.writeObject(mapFromList);
}
jg.writeEndArray();
jg.close();
}
JsonIterator<Map<String, Object>> testJsonIterator = new JsonIterator<>(type, new FileInputStream(testFile), () -> {
}, mapper);
List<Map<String, Object>> actualList = new ArrayList<>();
while (testJsonIterator.hasNext()) {
actualList.add(testJsonIterator.next());
}
testJsonIterator.close();
Assert.assertEquals(expectedList, actualList);
}
}

View File

@ -147,3 +147,35 @@ An example is shown below:
|type|This should be "timed"|yes| |type|This should be "timed"|yes|
|shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes| |shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes|
|delegate|firehose to use|yes| |delegate|firehose to use|yes|
#### SqlFirehose
SqlFirehoseFactory can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes.
An example is shown below:
```json
{
"type" : "sql",
"database": {
"type": "mysql",
"connectorConfig" : {
"connectURI" : "jdbc:mysql://host:port/schema",
"user" : "user",
"password" : "password"
}
},
"sqls" : ["SELECT * FROM table1", "SELECT * FROM table2"]
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "sql".||Yes|
|database|Specifies the database connection details.`type` should specify the database type and `connectorConfig` should specify the database connection properties via `connectURI`, `user` and `password`||Yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
|prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|
|fetchTimeout|Timeout for fetching the result set.|60000|No|
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|

View File

@ -0,0 +1,57 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.sql;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
@JsonTypeName("mysql")
public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
{
private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig;
public MySQLFirehoseDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig
)
{
this.connectorConfig = connectorConfig;
final BasicDataSource datasource = getDatasource(connectorConfig);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("com.mysql.jdbc.Driver");
this.dbi = new DBI(datasource);
}
@JsonProperty
public MetadataStorageConnectorConfig getConnectorConfig()
{
return connectorConfig;
}
@Override
public DBI getDBI()
{
return dbi;
}
}

View File

@ -20,9 +20,11 @@
package io.druid.metadata.storage.mysql; package io.druid.metadata.storage.mysql;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Key; import com.google.inject.Key;
import io.druid.firehose.sql.MySQLFirehoseDatabaseConnector;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind; import io.druid.guice.PolyBind;
@ -35,6 +37,7 @@ import io.druid.metadata.MySQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.NoopMetadataStorageProvider; import io.druid.metadata.NoopMetadataStorageProvider;
import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.SQLMetadataConnector;
import java.util.Collections;
import java.util.List; import java.util.List;
public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
@ -49,7 +52,12 @@ public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule im
@Override @Override
public List<? extends Module> getJacksonModules() public List<? extends Module> getJacksonModules()
{ {
return ImmutableList.of(); return Collections.singletonList(
new SimpleModule()
.registerSubtypes(
new NamedType(MySQLFirehoseDatabaseConnector.class, "mysql")
)
);
} }
@Override @Override
@ -81,5 +89,6 @@ public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule im
.addBinding(TYPE) .addBinding(TYPE)
.to(MySQLMetadataStorageActionHandlerFactory.class) .to(MySQLMetadataStorageActionHandlerFactory.class)
.in(LazySingleton.class); .in(LazySingleton.class);
} }
} }

View File

@ -192,7 +192,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JacksonInject ChatHandlerProvider chatHandlerProvider @JacksonInject ChatHandlerProvider chatHandlerProvider
) )
{ {
this( this(
id, id,
makeGroupId(ingestionSchema), makeGroupId(ingestionSchema),
taskResource, taskResource,

View File

@ -31,6 +31,7 @@ import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
import io.druid.segment.realtime.firehose.HttpFirehoseFactory; import io.druid.segment.realtime.firehose.HttpFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.SqlFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import java.util.Arrays; import java.util.Arrays;
@ -58,7 +59,8 @@ public class FirehoseModule implements DruidModule
new NamedType(HttpFirehoseFactory.class, "http"), new NamedType(HttpFirehoseFactory.class, "http"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"), new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining"), new NamedType(CombiningFirehoseFactory.class, "combining"),
new NamedType(FixedCountFirehoseFactory.class, "fixedCount") new NamedType(FixedCountFirehoseFactory.class, "fixedCount"),
new NamedType(SqlFirehoseFactory.class, "sql")
) )
); );
} }

View File

@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Predicate;
import io.druid.java.util.common.RetryUtils;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public abstract class SQLFirehoseDatabaseConnector
{
static final int MAX_RETRIES = 10;
public <T> T retryWithHandle(
HandleCallback<T> callback,
Predicate<Throwable> myShouldRetry
)
{
try {
return RetryUtils.retry(() -> getDBI().withHandle(callback), myShouldRetry, MAX_RETRIES);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public final boolean isTransientException(Throwable e)
{
return e != null && (e instanceof RetryTransactionException
|| e instanceof SQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| e instanceof UnableToExecuteStatementException
|| (e instanceof SQLException && isTransientException(e.getCause()))
|| (e instanceof DBIException && isTransientException(e.getCause())));
}
protected BasicDataSource getDatasource(MetadataStorageConnectorConfig connectorConfig)
{
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
String uri = connectorConfig.getConnectURI();
dataSource.setUrl(uri);
dataSource.setTestOnBorrow(true);
dataSource.setValidationQuery(getValidationQuery());
return dataSource;
}
public String getValidationQuery()
{
return "SELECT 1";
}
public abstract DBI getDBI();
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.druid.data.input.impl.prefetch.PrefetchSqlFirehoseFactory;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.StringUtils;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.SQLFirehoseDatabaseConnector;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.ResultSetException;
import org.skife.jdbi.v2.exceptions.StatementException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SqlFirehoseFactory extends PrefetchSqlFirehoseFactory<String>
{
@JsonProperty
private final List<String> sqls;
@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
private final ObjectMapper objectMapper;
@JsonProperty
private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector;
private final boolean foldCase;
@JsonCreator
public SqlFirehoseFactory(
@JsonProperty("sqls") List<String> sqls,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("foldCase") boolean foldCase,
@JsonProperty("database") SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector,
@JacksonInject @Smile ObjectMapper objectMapper
)
{
super(
maxCacheCapacityBytes,
maxFetchCapacityBytes,
prefetchTriggerBytes,
fetchTimeout,
objectMapper
);
Preconditions.checkArgument(sqls.size() > 0, "No SQL queries provided");
this.sqls = sqls;
this.objectMapper = objectMapper;
this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector;
this.foldCase = foldCase;
this.connectorConfig = null;
}
@Override
protected InputStream openObjectStream(String object, File fileName) throws IOException
{
Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!");
try (FileOutputStream fos = new FileOutputStream(fileName)) {
final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos);
sqlFirehoseDatabaseConnector.retryWithHandle(
(handle) -> {
ResultIterator<Map<String, Object>> resultIterator = handle.createQuery(
object
).map(
(index, r, ctx) -> {
Map<String, Object> resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>();
ResultSetMetaData resultMetadata;
try {
resultMetadata = r.getMetaData();
}
catch (SQLException e) {
throw new ResultSetException("Unable to obtain metadata from result set", e, ctx);
}
try {
for (int i = 1; i <= resultMetadata.getColumnCount(); i++) {
String key = resultMetadata.getColumnName(i);
String alias = resultMetadata.getColumnLabel(i);
Object value = r.getObject(i);
resultRow.put(alias != null ? alias : key, value);
}
}
catch (SQLException e) {
throw new ResultSetException("Unable to access specific metadata from " +
"result set metadata", e, ctx);
}
return resultRow;
}
).iterator();
jg.writeStartArray();
while (resultIterator.hasNext()) {
jg.writeObject(resultIterator.next());
}
jg.writeEndArray();
jg.close();
return null;
},
(exception) -> {
final boolean isStatementException = exception instanceof StatementException ||
(exception instanceof CallbackFailedException
&& exception.getCause() instanceof StatementException);
return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException);
}
);
}
return new FileInputStream(fileName);
}
private static class CaseFoldedMap extends HashMap<String, Object>
{
public static final long serialVersionUID = 1L;
@Override
public Object get(Object obj)
{
return super.get(StringUtils.toLowerCase((String) obj));
}
@Override
public Object put(String key, Object value)
{
return super.put(StringUtils.toLowerCase(key), value);
}
@Override
public boolean containsKey(Object obj)
{
return super.containsKey(StringUtils.toLowerCase((String) obj));
}
}
@Override
protected Collection<String> initObjects()
{
return sqls;
}
}

View File

@ -0,0 +1,310 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.Firehose;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.StringUtils;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.SQLFirehoseDatabaseConnector;
import io.druid.metadata.TestDerbyConnector;
import io.druid.segment.TestHelper;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class SqlFirehoseFactoryTest
{
private static final List<File> FIREHOSE_TMP_DIRS = new ArrayList<>();
private static File TEST_DIR;
private final String TABLE_NAME_1 = "FOOS_TABLE_1";
private final String TABLE_NAME_2 = "FOOS_TABLE_2";
private final List<String> SQLLIST1 = ImmutableList.of("SELECT timestamp,a,b FROM FOOS_TABLE_1");
private final List<String> SQLLIST2 = ImmutableList.of(
"SELECT timestamp,a,b FROM FOOS_TABLE_1",
"SELECT timestamp,a,b FROM FOOS_TABLE_2"
);
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper mapper = TestHelper.makeSmileMapper();
private final MapInputRowParser parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")),
Lists.newArrayList(),
Lists.newArrayList()
)
)
);
private TestDerbyConnector derbyConnector;
private TestDerbyFirehoseConnector derbyFirehoseConnector;
@BeforeClass
public static void setup() throws IOException
{
TEST_DIR = File.createTempFile(SqlFirehoseFactoryTest.class.getSimpleName(), "testDir");
FileUtils.forceDelete(TEST_DIR);
FileUtils.forceMkdir(TEST_DIR);
}
@AfterClass
public static void teardown() throws IOException
{
FileUtils.forceDelete(TEST_DIR);
for (File dir : FIREHOSE_TMP_DIRS) {
FileUtils.forceDelete(dir);
}
}
private void assertResult(List<Row> rows, List<String> sqls)
{
Assert.assertEquals(10 * sqls.size(), rows.size());
rows.sort((r1, r2) -> {
int c = r1.getTimestamp().compareTo(r2.getTimestamp());
if (c != 0) {
return c;
}
c = Integer.valueOf(r1.getDimension("a").get(0)).compareTo(Integer.valueOf(r2.getDimension("a").get(0)));
if (c != 0) {
return c;
}
return Integer.valueOf(r1.getDimension("b").get(0)).compareTo(Integer.valueOf(r2.getDimension("b").get(0)));
});
int rowCount = 0;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < sqls.size(); j++) {
final Row row = rows.get(rowCount);
String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i);
Assert.assertEquals(timestampSt, row.getTimestamp().toString());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("a").get(0)).intValue());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("b").get(0)).intValue());
rowCount++;
}
}
}
private void assertNumRemainingCacheFiles(File firehoseTmpDir, int expectedNumFiles)
{
final String[] files = firehoseTmpDir.list();
Assert.assertNotNull(files);
Assert.assertEquals(expectedNumFiles, files.length);
}
private File createFirehoseTmpDir(String dirSuffix) throws IOException
{
final File firehoseTempDir = File.createTempFile(
SqlFirehoseFactoryTest.class.getSimpleName(),
dirSuffix
);
FileUtils.forceDelete(firehoseTempDir);
FileUtils.forceMkdir(firehoseTempDir);
FIREHOSE_TMP_DIRS.add(firehoseTempDir);
return firehoseTempDir;
}
private void dropTable(final String tableName)
{
derbyConnector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute();
return null;
}
}
);
}
private void createAndUpdateTable(final String tableName)
{
derbyConnector = derbyConnectorRule.getConnector();
derbyFirehoseConnector = new TestDerbyFirehoseConnector(new MetadataStorageConnectorConfig(),
derbyConnector.getDBI());
derbyConnector.createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " timestamp varchar(255) NOT NULL,\n"
+ " a VARCHAR(255) NOT NULL,\n"
+ " b VARCHAR(255) NOT NULL\n"
+ ")",
tableName
)
)
);
derbyConnector.getDBI().withHandle(
(handle) -> {
Batch batch = handle.createBatch();
for (int i = 0; i < 10; i++) {
String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i);
batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')",
tableName, timestampSt,
i, i
));
}
batch.execute();
return null;
}
);
}
@Test
public void testWithoutCacheAndFetch() throws Exception
{
createAndUpdateTable(TABLE_NAME_1);
final SqlFirehoseFactory factory =
new SqlFirehoseFactory(
SQLLIST1,
0L,
0L,
0L,
0L,
true,
derbyFirehoseConnector,
mapper
);
final List<Row> rows = new ArrayList<>();
final File firehoseTmpDir = createFirehoseTmpDir("testWithoutCacheAndFetch");
try (Firehose firehose = factory.connect(parser, firehoseTmpDir)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
assertResult(rows, SQLLIST1);
assertNumRemainingCacheFiles(firehoseTmpDir, 0);
dropTable(TABLE_NAME_1);
}
@Test
public void testWithoutCache() throws IOException
{
createAndUpdateTable(TABLE_NAME_1);
final SqlFirehoseFactory factory =
new SqlFirehoseFactory(
SQLLIST1,
0L,
null,
null,
null,
true,
derbyFirehoseConnector,
mapper
);
final List<Row> rows = new ArrayList<>();
final File firehoseTmpDir = createFirehoseTmpDir("testWithoutCache");
try (Firehose firehose = factory.connect(parser, firehoseTmpDir)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
assertResult(rows, SQLLIST1);
assertNumRemainingCacheFiles(firehoseTmpDir, 0);
dropTable(TABLE_NAME_1);
}
@Test
public void testWithCacheAndFetch() throws IOException
{
createAndUpdateTable(TABLE_NAME_1);
createAndUpdateTable(TABLE_NAME_2);
final SqlFirehoseFactory factory = new
SqlFirehoseFactory(
SQLLIST2,
null,
null,
0L,
null,
true,
derbyFirehoseConnector,
mapper
);
final List<Row> rows = new ArrayList<>();
final File firehoseTmpDir = createFirehoseTmpDir("testWithCacheAndFetch");
try (Firehose firehose = factory.connect(parser, firehoseTmpDir)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
assertResult(rows, SQLLIST2);
assertNumRemainingCacheFiles(firehoseTmpDir, 2);
dropTable(TABLE_NAME_1);
dropTable(TABLE_NAME_2);
}
private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector
{
private final DBI dbi;
private TestDerbyFirehoseConnector(MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi)
{
final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
this.dbi = dbi;
}
@Override
public DBI getDBI()
{
return dbi;
}
}
}