Make ExtractionNamespaceCacheFactory to populate cache directly instead of returning callable (#3651)

* Rename ExtractionNamespaceCacheFactory.getCachePopulator() to populateCache() and make it to populate cache itself instead of returning a Callable which populates cache, because this "callback style" is not actually needed.

ExtractionNamespaceCacheFactory isn't a "factory" so it should be renamed, but renaming right in this commit would tear the git history for files, because ExtractionNamespaceCacheFactory implementations have too many changed lines. Going to rename ExtractionNamespaceCacheFactory to something like "CachePopulator" in one of subsequent PRs.

This commit is a part of a bigger refactoring of the lookup cache subsystem.

* Remove unused line and imports
This commit is contained in:
Roman Leventov 2016-11-04 14:33:16 -06:00 committed by Fangjin Yang
parent 4cbebd0931
commit 22b57ddd60
10 changed files with 207 additions and 261 deletions

View File

@ -20,7 +20,6 @@
package io.druid.query.lookup.namespace;
import java.util.Map;
import java.util.concurrent.Callable;
/**
*
@ -34,7 +33,8 @@ public interface ExtractionNamespaceCacheFactory<T extends ExtractionNamespace>
* is used to populate the namespace cache each time.
* For ExtractionNamespace implementations which do not have regular updates, this function can be used to
* initialize resources.
* If the result of the Callable is the same as what is passed in as lastVersion, then no swap takes place, and the swap is discarded.
* If the returned version is the same as what is passed in as lastVersion, then no swap takes place, and the swap
* is discarded.
*
* @param id The ID of ExtractionNamespace
* @param extractionNamespace The ExtractionNamespace for which to populate data.
@ -44,8 +44,7 @@ public interface ExtractionNamespaceCacheFactory<T extends ExtractionNamespace>
* a swappable cache of the data may ignore this but must make sure `buildFn(...)` returns
* a proper Function.
*
* @return A callable that will be used to refresh resources of the namespace and return the version string used in
* the populating
* @return return the (new) version string used in the populating
*/
Callable<String> getCachePopulator(String id, T extractionNamespace, String lastVersion, Map<String, String> swap);
String populateCache(String id, T extractionNamespace, String lastVersion, Map<String, String> swap) throws Exception;
}

View File

@ -36,7 +36,6 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -50,82 +49,68 @@ public class JDBCExtractionNamespaceCacheFactory
private final ConcurrentMap<String, DBI> dbiCache = new ConcurrentHashMap<>();
@Override
public Callable<String> getCachePopulator(
public String populateCache(
final String id,
final JDBCExtractionNamespace namespace,
final String lastVersion,
final Map<String, String> cache
)
) throws Exception
{
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
final Long lastDBUpdate = lastUpdates(id, namespace);
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
return new Callable<String>()
{
@Override
public String call() throws Exception
{
return lastVersion;
}
};
return lastVersion;
}
return new Callable<String>()
{
@Override
public String call()
{
final long dbQueryStart = System.currentTimeMillis();
final DBI dbi = ensureDBI(id, namespace);
final String table = namespace.getTable();
final String valueColumn = namespace.getValueColumn();
final String keyColumn = namespace.getKeyColumn();
final long dbQueryStart = System.currentTimeMillis();
final DBI dbi = ensureDBI(id, namespace);
final String table = namespace.getTable();
final String valueColumn = namespace.getValueColumn();
final String keyColumn = namespace.getKeyColumn();
LOG.debug("Updating [%s]", id);
final List<Pair<String, String>> pairs = dbi.withHandle(
new HandleCallback<List<Pair<String, String>>>()
{
@Override
public List<Pair<String, String>> withHandle(Handle handle) throws Exception
{
final String query;
query = String.format(
"SELECT %s, %s FROM %s",
keyColumn,
valueColumn,
table
);
return handle
.createQuery(
query
).map(
new ResultSetMapper<Pair<String, String>>()
{
LOG.debug("Updating [%s]", id);
final List<Pair<String, String>> pairs = dbi.withHandle(
new HandleCallback<List<Pair<String, String>>>()
{
@Override
public List<Pair<String, String>> withHandle(Handle handle) throws Exception
{
final String query;
query = String.format(
"SELECT %s, %s FROM %s",
keyColumn,
valueColumn,
table
);
return handle
.createQuery(
query
).map(
new ResultSetMapper<Pair<String, String>>()
{
@Override
public Pair<String, String> map(
final int index,
final ResultSet r,
final StatementContext ctx
) throws SQLException
{
return new Pair<String, String>(r.getString(keyColumn), r.getString(valueColumn));
}
}
).list();
}
}
);
for (Pair<String, String> pair : pairs) {
cache.put(pair.lhs, pair.rhs);
@Override
public Pair<String, String> map(
final int index,
final ResultSet r,
final StatementContext ctx
) throws SQLException
{
return new Pair<>(r.getString(keyColumn), r.getString(valueColumn));
}
}
).list();
}
}
LOG.info("Finished loading %d values for namespace[%s]", cache.size(), id);
if (lastDBUpdate != null) {
return lastDBUpdate.toString();
} else {
return String.format("%d", dbQueryStart);
}
}
};
);
for (Pair<String, String> pair : pairs) {
cache.put(pair.lhs, pair.rhs);
}
LOG.info("Finished loading %d values for namespace[%s]", cache.size(), id);
if (lastDBUpdate != null) {
return lastDBUpdate.toString();
} else {
return String.format("%d", dbQueryStart);
}
}
private DBI ensureDBI(String id, JDBCExtractionNamespace namespace)

View File

@ -24,7 +24,6 @@ import io.druid.query.lookup.namespace.StaticMapExtractionNamespace;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
public class StaticMapExtractionNamespaceCacheFactory
implements ExtractionNamespaceCacheFactory<StaticMapExtractionNamespace>
@ -32,7 +31,7 @@ public class StaticMapExtractionNamespaceCacheFactory
private final String version = UUID.randomUUID().toString();
@Override
public Callable<String> getCachePopulator(
public String populateCache(
final String id,
final StaticMapExtractionNamespace extractionNamespace,
final String lastVersion,
@ -46,15 +45,8 @@ public class StaticMapExtractionNamespaceCacheFactory
"StaticMapExtractionNamespaceCacheFactory could only be configured for a namespace which is scheduled " +
"to be updated once, not periodically. Last version: `" + lastVersion + "`");
}
return new Callable<String>()
{
@Override
public String call() throws Exception
{
swap.putAll(extractionNamespace.getMap());
return version;
}
};
swap.putAll(extractionNamespace.getMap());
return version;
}
String getVersion()

View File

@ -59,135 +59,128 @@ public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCa
}
@Override
public Callable<String> getCachePopulator(
public String populateCache(
final String id,
final URIExtractionNamespace extractionNamespace,
@Nullable final String lastVersion,
final Map<String, String> cache
)
) throws Exception
{
return new Callable<String>()
{
@Override
public String call()
{
final boolean doSearch = extractionNamespace.getUriPrefix() != null;
final URI originalUri = doSearch ? extractionNamespace.getUriPrefix() : extractionNamespace.getUri();
final SearchableVersionedDataFinder<URI> pullerRaw = pullers.get(originalUri.getScheme());
if (pullerRaw == null) {
throw new IAE(
"Unknown loader type[%s]. Known types are %s",
originalUri.getScheme(),
pullers.keySet()
);
}
if (!(pullerRaw instanceof URIDataPuller)) {
throw new IAE(
"Cannot load data from location [%s]. Data pulling from [%s] not supported",
originalUri,
originalUri.getScheme()
);
}
final URIDataPuller puller = (URIDataPuller) pullerRaw;
final URI uri;
if (doSearch) {
final Pattern versionRegex;
final boolean doSearch = extractionNamespace.getUriPrefix() != null;
final URI originalUri = doSearch ? extractionNamespace.getUriPrefix() : extractionNamespace.getUri();
final SearchableVersionedDataFinder<URI> pullerRaw = pullers.get(originalUri.getScheme());
if (pullerRaw == null) {
throw new IAE(
"Unknown loader type[%s]. Known types are %s",
originalUri.getScheme(),
pullers.keySet()
);
}
if (!(pullerRaw instanceof URIDataPuller)) {
throw new IAE(
"Cannot load data from location [%s]. Data pulling from [%s] not supported",
originalUri,
originalUri.getScheme()
);
}
final URIDataPuller puller = (URIDataPuller) pullerRaw;
final URI uri;
if (doSearch) {
final Pattern versionRegex;
if (extractionNamespace.getFileRegex() != null) {
versionRegex = Pattern.compile(extractionNamespace.getFileRegex());
} else {
versionRegex = null;
}
uri = pullerRaw.getLatestVersion(
extractionNamespace.getUriPrefix(),
versionRegex
);
if (uri == null) {
throw new RuntimeException(
new FileNotFoundException(
String.format(
"Could not find match for pattern `%s` in [%s] for %s",
versionRegex,
originalUri,
extractionNamespace
)
)
);
}
} else {
uri = extractionNamespace.getUri();
}
final String uriPath = uri.getPath();
try {
return RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
final String version = puller.getVersion(uri);
try {
// Important to call equals() against version because lastVersion could be null
if (version.equals(lastVersion)) {
log.debug(
"URI [%s] for namespace [%s] has the same last modified time [%s] as the last cached. " +
"Skipping ",
uri.toString(),
id,
version
);
return lastVersion;
}
}
catch (NumberFormatException ex) {
log.debug(ex, "Failed to get last modified timestamp. Assuming no timestamp");
}
final ByteSource source;
if (CompressionUtils.isGz(uriPath)) {
// Simple gzip stream
log.debug("Loading gz");
source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return CompressionUtils.gzipInputStream(puller.getInputStream(uri));
}
};
} else {
source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return puller.getInputStream(uri);
}
};
}
final MapPopulator.PopulateResult populateResult = new MapPopulator<>(
extractionNamespace.getNamespaceParseSpec()
.getParser()
).populate(source, cache);
log.info(
"Finished loading %,d values from %,d lines for namespace [%s]",
populateResult.getEntries(),
populateResult.getLines(),
id
);
return version;
}
},
puller.shouldRetryPredicate(),
DEFAULT_NUM_RETRIES
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
if (extractionNamespace.getFileRegex() != null) {
versionRegex = Pattern.compile(extractionNamespace.getFileRegex());
} else {
versionRegex = null;
}
};
uri = pullerRaw.getLatestVersion(
extractionNamespace.getUriPrefix(),
versionRegex
);
if (uri == null) {
throw new RuntimeException(
new FileNotFoundException(
String.format(
"Could not find match for pattern `%s` in [%s] for %s",
versionRegex,
originalUri,
extractionNamespace
)
)
);
}
} else {
uri = extractionNamespace.getUri();
}
final String uriPath = uri.getPath();
try {
return RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
final String version = puller.getVersion(uri);
try {
// Important to call equals() against version because lastVersion could be null
if (version.equals(lastVersion)) {
log.debug(
"URI [%s] for namespace [%s] has the same last modified time [%s] as the last cached. " +
"Skipping ",
uri.toString(),
id,
version
);
return lastVersion;
}
}
catch (NumberFormatException ex) {
log.debug(ex, "Failed to get last modified timestamp. Assuming no timestamp");
}
final ByteSource source;
if (CompressionUtils.isGz(uriPath)) {
// Simple gzip stream
log.debug("Loading gz");
source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return CompressionUtils.gzipInputStream(puller.getInputStream(uri));
}
};
} else {
source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return puller.getInputStream(uri);
}
};
}
final MapPopulator.PopulateResult populateResult = new MapPopulator<>(
extractionNamespace.getNamespaceParseSpec()
.getParser()
).populate(source, cache);
log.info(
"Finished loading %,d values from %,d lines for namespace [%s]",
populateResult.getEntries(),
populateResult.getLines(),
id
);
return version;
}
},
puller.shouldRetryPredicate(),
DEFAULT_NUM_RETRIES
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -28,7 +28,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ExecutorServices;
@ -41,7 +40,6 @@ import javax.annotation.concurrent.GuardedBy;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -357,10 +355,9 @@ public abstract class NamespaceExtractionCacheManager
}
final Map<String, String> cache = getCacheMap(cacheId);
final String preVersion = implData.latestVersion;
final Callable<String> runnable = factory.getCachePopulator(id, namespace, preVersion, cache);
tasksStarted.incrementAndGet();
final String newVersion = runnable.call();
final String newVersion = factory.populateCache(id, namespace, preVersion, cache);
if (newVersion.equals(preVersion)) {
log.debug("Version `%s` already exists, skipping updating cache", preVersion);
} else {

View File

@ -105,7 +105,7 @@ public class NamespacedExtractorModuleTest
null
);
Map<String, String> map = new HashMap<>();
factory.getCachePopulator(namespaceID, namespace, null, map).call();
factory.populateCache(namespaceID, namespace, null, map);
Assert.assertEquals("bar", map.get("foo"));
Assert.assertEquals(null, map.get("baz"));
cacheManager.delete(namespaceID);

View File

@ -37,7 +37,7 @@ public class StaticMapExtractionNamespaceCacheFactoryTest
final StaticMapExtractionNamespaceCacheFactory factory = new StaticMapExtractionNamespaceCacheFactory();
final StaticMapExtractionNamespace namespace = new StaticMapExtractionNamespace(MAP);
final Map<String, String> cache = new HashMap<>();
Assert.assertEquals(factory.getVersion(), factory.getCachePopulator(null, namespace, null, cache).call());
Assert.assertEquals(factory.getVersion(), factory.populateCache(null, namespace, null, cache));
Assert.assertEquals(MAP, cache);
}
@ -47,6 +47,6 @@ public class StaticMapExtractionNamespaceCacheFactoryTest
final StaticMapExtractionNamespaceCacheFactory factory = new StaticMapExtractionNamespaceCacheFactory();
final StaticMapExtractionNamespace namespace = new StaticMapExtractionNamespace(MAP);
final Map<String, String> cache = new HashMap<>();
Assert.assertNull(factory.getCachePopulator(null, namespace, factory.getVersion(), cache).call());
Assert.assertNull(factory.populateCache(null, namespace, factory.getVersion(), cache));
}
}

View File

@ -72,7 +72,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -402,15 +401,13 @@ public class URIExtractionNamespaceCacheFactoryTest
Assert.assertTrue(manager.getKnownIDs().isEmpty());
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
Callable<String> populator = factory.getCachePopulator(id, namespace, null, map);
String v = populator.call();
String v = factory.populateCache(id, namespace, null, map);
Assert.assertEquals("bar", map.get("foo"));
Assert.assertEquals(null, map.get("baz"));
Assert.assertNotNull(v);
populator = factory.getCachePopulator(id, namespace, v, map);
String v2 = populator.call();
String v2 = factory.populateCache(id, namespace, v, map);
Assert.assertEquals(v, v2);
Assert.assertEquals("bar", map.get("foo"));
Assert.assertEquals(null, map.get("baz"));
@ -429,7 +426,7 @@ public class URIExtractionNamespaceCacheFactoryTest
Assert.assertTrue(new File(namespace.getUri()).delete());
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
try {
factory.getCachePopulator(id, badNamespace, null, map).call();
factory.populateCache(id, badNamespace, null, map);
}
catch (RuntimeException e) {
Assert.assertNotNull(e.getCause());
@ -471,7 +468,7 @@ public class URIExtractionNamespaceCacheFactoryTest
return t.getCause() != null && t.getCause() instanceof FileNotFoundException;
}
});
factory.getCachePopulator(badId, badNamespace, null, map).call();
factory.populateCache(badId, badNamespace, null, map);
}
@Test(expected = IAE.class)
@ -560,6 +557,6 @@ public class URIExtractionNamespaceCacheFactoryTest
null
);
final Map<String, String> map = new HashMap<>();
Assert.assertNotNull(factory.getCachePopulator(id, extractionNamespace, null, map).call());
Assert.assertNotNull(factory.populateCache(id, extractionNamespace, null, map));
}
}

View File

@ -193,39 +193,31 @@ public class JDBCExtractionNamespaceTest
new JDBCExtractionNamespaceCacheFactory()
{
@Override
public Callable<String> getCachePopulator(
public String populateCache(
final String id,
final JDBCExtractionNamespace namespace,
final String lastVersion,
final Map<String, String> cache
)
) throws Exception
{
final Callable<String> cachePopulator = super.getCachePopulator(
id,
namespace,
lastVersion,
cache
);
return new Callable<String>()
{
@Override
public String call() throws Exception
{
updateLock.lockInterruptibly();
try {
log.debug("Running cache populator");
try {
return cachePopulator.call();
}
finally {
updates.incrementAndGet();
}
}
finally {
updateLock.unlock();
}
updateLock.lockInterruptibly();
try {
log.debug("Running cache populator");
try {
return super.populateCache(
id,
namespace,
lastVersion,
cache
);
}
};
finally {
updates.incrementAndGet();
}
}
finally {
updateLock.unlock();
}
}
}
)

View File

@ -26,7 +26,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.concurrent.Execs;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.java.util.common.IAE;
@ -56,7 +55,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -94,24 +92,17 @@ public class NamespaceExtractionCacheManagerExecutorsTest
)
{
@Override
public Callable<String> getCachePopulator(
public String populateCache(
final String id,
final URIExtractionNamespace extractionNamespace,
final String lastVersion,
final Map<String, String> cache
)
) throws Exception
{
return new Callable<String>()
{
@Override
public String call() throws Exception
{
// Don't actually read off disk because TravisCI doesn't like that
cache.put(KEY, VALUE);
Thread.sleep(2);// To make absolutely sure there is a unique currentTimeMillis
return Long.toString(System.currentTimeMillis());
}
};
// Don't actually read off disk because TravisCI doesn't like that
cache.put(KEY,VALUE);
Thread.sleep(2);// To make absolutely sure there is a unique currentTimeMillis
return Long.toString(System.currentTimeMillis());
}
};
manager = new OnHeapNamespaceExtractionCacheManager(