mirror of https://github.com/apache/druid.git
Hopefully add better timeouts and ordering to JDBCExtractionNamespaceTest
This commit is contained in:
parent
5b67ec6cda
commit
ac8e32b58e
|
@ -21,27 +21,21 @@ package io.druid.server.namespace;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
import io.druid.common.utils.JodaUtils;
|
import io.druid.common.utils.JodaUtils;
|
||||||
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
|
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
|
||||||
import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
|
import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
|
||||||
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
|
|
||||||
import org.skife.jdbi.v2.DBI;
|
import org.skife.jdbi.v2.DBI;
|
||||||
import org.skife.jdbi.v2.FoldController;
|
|
||||||
import org.skife.jdbi.v2.Folder3;
|
|
||||||
import org.skife.jdbi.v2.Handle;
|
import org.skife.jdbi.v2.Handle;
|
||||||
import org.skife.jdbi.v2.StatementContext;
|
import org.skife.jdbi.v2.StatementContext;
|
||||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||||
import org.skife.jdbi.v2.tweak.ResultSetMapper;
|
import org.skife.jdbi.v2.tweak.ResultSetMapper;
|
||||||
import org.skife.jdbi.v2.util.StringMapper;
|
|
||||||
import org.skife.jdbi.v2.util.TimestampMapper;
|
import org.skife.jdbi.v2.util.TimestampMapper;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
@ -82,7 +76,7 @@ public class JDBCExtractionNamespaceFunctionFactory
|
||||||
{
|
{
|
||||||
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
|
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
|
||||||
final Long lastDBUpdate = lastUpdates(namespace);
|
final Long lastDBUpdate = lastUpdates(namespace);
|
||||||
if(lastDBUpdate != null && lastDBUpdate <= lastCheck){
|
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
|
||||||
return new Callable<String>()
|
return new Callable<String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -132,23 +126,7 @@ public class JDBCExtractionNamespaceFunctionFactory
|
||||||
return new Pair<String, String>(r.getString(keyColumn), r.getString(valueColumn));
|
return new Pair<String, String>(r.getString(keyColumn), r.getString(valueColumn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
).fold(
|
).list();
|
||||||
new LinkedList<Pair<String, String>>(),
|
|
||||||
new Folder3<LinkedList<Pair<String, String>>, Pair<String, String>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public LinkedList<Pair<String, String>> fold(
|
|
||||||
LinkedList<Pair<String, String>> accumulator,
|
|
||||||
Pair<String, String> rs,
|
|
||||||
FoldController control,
|
|
||||||
StatementContext ctx
|
|
||||||
) throws SQLException
|
|
||||||
{
|
|
||||||
accumulator.add(rs);
|
|
||||||
return accumulator;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
})
|
})
|
||||||
public abstract class NamespaceExtractionCacheManager
|
public abstract class NamespaceExtractionCacheManager
|
||||||
{
|
{
|
||||||
private static class NamespaceImplData
|
protected static class NamespaceImplData
|
||||||
{
|
{
|
||||||
public NamespaceImplData(
|
public NamespaceImplData(
|
||||||
final ListenableFuture<?> future,
|
final ListenableFuture<?> future,
|
||||||
|
|
|
@ -22,34 +22,32 @@ package io.druid.server.namespace.cache;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.metamx.common.ISE;
|
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
import io.druid.metadata.MetadataStorageConnectorConfig;
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.druid.metadata.TestDerbyConnector;
|
||||||
import io.druid.query.extraction.namespace.ExtractionNamespace;
|
import io.druid.query.extraction.namespace.ExtractionNamespace;
|
||||||
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
|
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
|
||||||
import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
|
import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
|
||||||
import io.druid.server.metrics.NoopServiceEmitter;
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
import io.druid.server.namespace.JDBCExtractionNamespaceFunctionFactory;
|
import io.druid.server.namespace.JDBCExtractionNamespaceFunctionFactory;
|
||||||
import org.apache.commons.dbcp2.BasicDataSource;
|
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
import org.skife.jdbi.v2.DBI;
|
|
||||||
import org.skife.jdbi.v2.Handle;
|
import org.skife.jdbi.v2.Handle;
|
||||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -57,6 +55,9 @@ import java.util.concurrent.ExecutionException;
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class JDBCExtractionNamespaceTest
|
public class JDBCExtractionNamespaceTest
|
||||||
{
|
{
|
||||||
|
@Rule
|
||||||
|
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
||||||
|
private static final Logger log = new Logger(JDBCExtractionNamespaceTest.class);
|
||||||
private static final String namespace = "testNamespace";
|
private static final String namespace = "testNamespace";
|
||||||
private static final String tableName = "abstractDbRenameTest";
|
private static final String tableName = "abstractDbRenameTest";
|
||||||
private static final String keyName = "keyName";
|
private static final String keyName = "keyName";
|
||||||
|
@ -67,45 +68,11 @@ public class JDBCExtractionNamespaceTest
|
||||||
"bad", "bar",
|
"bad", "bar",
|
||||||
"how about that", "foo"
|
"how about that", "foo"
|
||||||
);
|
);
|
||||||
private static final String connectionURI = "jdbc:derby:memory:druid;create=true";
|
|
||||||
private static DBI dbi;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static final void createTables()
|
|
||||||
{
|
|
||||||
final BasicDataSource datasource = new BasicDataSource();
|
|
||||||
datasource.setUrl(connectionURI);
|
|
||||||
datasource.setDriverClassLoader(JDBCExtractionNamespaceTest.class.getClassLoader());
|
|
||||||
datasource.setDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
|
|
||||||
dbi = new DBI(datasource);
|
|
||||||
dbi.withHandle(
|
|
||||||
new HandleCallback<Void>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Void withHandle(Handle handle) throws Exception
|
|
||||||
{
|
|
||||||
handle
|
|
||||||
.createStatement(
|
|
||||||
String.format(
|
|
||||||
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))",
|
|
||||||
tableName,
|
|
||||||
tsColumn_,
|
|
||||||
keyName,
|
|
||||||
valName
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.execute();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{0}")
|
@Parameterized.Parameters(name = "{0}")
|
||||||
public static Collection<Object[]> getParameters()
|
public static Collection<Object[]> getParameters()
|
||||||
{
|
{
|
||||||
return ImmutableList.<Object[]>of(
|
return ImmutableList.of(
|
||||||
new Object[]{"tsColumn"},
|
new Object[]{"tsColumn"},
|
||||||
new Object[]{null}
|
new Object[]{null}
|
||||||
);
|
);
|
||||||
|
@ -120,95 +87,126 @@ public class JDBCExtractionNamespaceTest
|
||||||
|
|
||||||
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
|
||||||
private final String tsColumn;
|
private final String tsColumn;
|
||||||
private NamespaceExtractionCacheManager extractionCacheManager;
|
private OnHeapNamespaceExtractionCacheManager extractionCacheManager;
|
||||||
private final Lifecycle lifecycle = new Lifecycle();
|
private final Lifecycle lifecycle = new Lifecycle();
|
||||||
|
private final AtomicLong updates = new AtomicLong(0L);
|
||||||
|
private final Object updateLock = new Object();
|
||||||
|
private Handle handle;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup()
|
public void setup() throws Exception
|
||||||
{
|
{
|
||||||
dbi.withHandle(
|
log.info("Setting up");
|
||||||
new HandleCallback<Void>()
|
handle = derbyConnectorRule.getConnector().getDBI().open();
|
||||||
{
|
Assert.assertEquals(
|
||||||
@Override
|
0,
|
||||||
public Void withHandle(Handle handle) throws Exception
|
handle.createStatement(
|
||||||
{
|
String.format(
|
||||||
handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).execute();
|
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))",
|
||||||
handle.commit();
|
tableName,
|
||||||
return null;
|
tsColumn_,
|
||||||
}
|
keyName,
|
||||||
}
|
valName
|
||||||
|
)
|
||||||
|
).setQueryTimeout(1).execute()
|
||||||
);
|
);
|
||||||
|
handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute();
|
||||||
|
handle.commit();
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : renames.entrySet()) {
|
for (Map.Entry<String, String> entry : renames.entrySet()) {
|
||||||
insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
|
insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
|
||||||
}
|
}
|
||||||
final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>> factoryMap = new HashMap<>();
|
|
||||||
extractionCacheManager = new OnHeapNamespaceExtractionCacheManager(
|
extractionCacheManager = new OnHeapNamespaceExtractionCacheManager(
|
||||||
lifecycle,
|
lifecycle,
|
||||||
fnCache,
|
fnCache,
|
||||||
new NoopServiceEmitter(),
|
new NoopServiceEmitter(),
|
||||||
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
|
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
|
||||||
JDBCExtractionNamespace.class, new JDBCExtractionNamespaceFunctionFactory()
|
JDBCExtractionNamespace.class,
|
||||||
|
new JDBCExtractionNamespaceFunctionFactory()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Callable<String> getCachePopulator(
|
||||||
|
final JDBCExtractionNamespace namespace,
|
||||||
|
final String lastVersion,
|
||||||
|
final Map<String, String> cache
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final Callable<String> cachePopulator = super.getCachePopulator(namespace, lastVersion, cache);
|
||||||
|
return new Callable<String>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String call() throws Exception
|
||||||
|
{
|
||||||
|
synchronized (updateLock) {
|
||||||
|
log.debug("Running cache populator");
|
||||||
|
try {
|
||||||
|
return cachePopulator.call();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
updates.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
lifecycle.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown()
|
public void tearDown() throws InterruptedException
|
||||||
{
|
{
|
||||||
|
log.info("Tearing down");
|
||||||
|
handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute();
|
||||||
|
handle.close();
|
||||||
|
Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace));
|
||||||
lifecycle.stop();
|
lifecycle.stop();
|
||||||
|
final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(namespace);
|
||||||
|
if (implData != null && implData.future != null) {
|
||||||
|
implData.future.cancel(true);
|
||||||
|
Assert.assertTrue(implData.future.isDone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insertValues(final String key, final String val, final String updateTs)
|
private void insertValues(final String key, final String val, final String updateTs) throws InterruptedException
|
||||||
{
|
{
|
||||||
dbi.withHandle(
|
final String query;
|
||||||
new HandleCallback<Void>()
|
if (tsColumn == null) {
|
||||||
{
|
handle.createStatement(
|
||||||
@Override
|
String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
|
||||||
public Void withHandle(Handle handle) throws Exception
|
).setQueryTimeout(1).execute();
|
||||||
{
|
query = String.format(
|
||||||
final String query;
|
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
|
||||||
if (tsColumn == null) {
|
tableName,
|
||||||
handle.createStatement(
|
keyName, valName,
|
||||||
String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
|
key, val
|
||||||
).execute();
|
);
|
||||||
handle.commit();
|
} else {
|
||||||
query = String.format(
|
query = String.format(
|
||||||
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
|
"INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')",
|
||||||
tableName,
|
tableName,
|
||||||
keyName, valName,
|
tsColumn, keyName, valName,
|
||||||
key, val
|
updateTs, key, val
|
||||||
);
|
);
|
||||||
} else {
|
}
|
||||||
query = String.format(
|
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
|
||||||
"INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')",
|
handle.commit();
|
||||||
tableName,
|
// Some internals have timing resolution no better than MS. This is to help make sure that checks for timings
|
||||||
tsColumn, keyName, valName,
|
// have elapsed at least to the next ms... 2 is for good measure.
|
||||||
updateTs, key, val
|
Thread.sleep(2);
|
||||||
);
|
|
||||||
}
|
|
||||||
if (1 != handle.createStatement(query).execute()) {
|
|
||||||
throw new ISE("Did not return the correct number of rows");
|
|
||||||
}
|
|
||||||
handle.commit();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60_000)
|
@Test(timeout = 60_000L)
|
||||||
public void testMapping()
|
public void testMapping()
|
||||||
throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException,
|
throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException,
|
||||||
InterruptedException
|
InterruptedException, TimeoutException
|
||||||
{
|
{
|
||||||
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
|
|
||||||
Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI");
|
|
||||||
uriField.setAccessible(true);
|
|
||||||
uriField.set(config, connectionURI);
|
|
||||||
|
|
||||||
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
|
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
|
||||||
namespace,
|
namespace,
|
||||||
config,
|
derbyConnectorRule.getMetadataConnectorConfig(),
|
||||||
tableName,
|
tableName,
|
||||||
keyName,
|
keyName,
|
||||||
valName,
|
valName,
|
||||||
|
@ -217,92 +215,104 @@ public class JDBCExtractionNamespaceTest
|
||||||
);
|
);
|
||||||
NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace));
|
NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace));
|
||||||
Function<String, String> extractionFn = fnCache.get(extractionNamespace.getNamespace());
|
Function<String, String> extractionFn = fnCache.get(extractionNamespace.getNamespace());
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : renames.entrySet()) {
|
for (Map.Entry<String, String> entry : renames.entrySet()) {
|
||||||
String key = entry.getKey();
|
String key = entry.getKey();
|
||||||
String val = entry.getValue();
|
String val = entry.getValue();
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
|
"non-null check",
|
||||||
val,
|
val,
|
||||||
String.format(val, extractionFn.apply(key))
|
extractionFn.apply(key)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
|
"null check",
|
||||||
null,
|
null,
|
||||||
extractionFn.apply("baz")
|
extractionFn.apply("baz")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout = 60_000)
|
@Test(timeout = 60_000L)
|
||||||
public void testSkipOld()
|
public void testSkipOld()
|
||||||
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
|
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
|
||||||
{
|
{
|
||||||
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
|
final JDBCExtractionNamespace extractionNamespace = ensureNamespace();
|
||||||
Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI");
|
|
||||||
uriField.setAccessible(true);
|
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
|
||||||
uriField.set(config, connectionURI);
|
|
||||||
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
|
|
||||||
namespace,
|
|
||||||
config,
|
|
||||||
tableName,
|
|
||||||
keyName,
|
|
||||||
valName,
|
|
||||||
tsColumn,
|
|
||||||
new Period(1)
|
|
||||||
);
|
|
||||||
extractionCacheManager.schedule(extractionNamespace);
|
|
||||||
while (!fnCache.containsKey(extractionNamespace.getNamespace())) {
|
|
||||||
Thread.sleep(1);
|
|
||||||
}
|
|
||||||
Assert.assertEquals(
|
|
||||||
"bar",
|
|
||||||
fnCache.get(extractionNamespace.getNamespace()).apply("foo")
|
|
||||||
);
|
|
||||||
if (tsColumn != null) {
|
if (tsColumn != null) {
|
||||||
insertValues("foo", "baz", "1900-01-01 00:00:00");
|
insertValues("foo", "baz", "1900-01-01 00:00:00");
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(10);
|
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
|
||||||
|
|
||||||
Assert.assertEquals(
|
|
||||||
"bar",
|
|
||||||
fnCache.get(extractionNamespace.getNamespace()).apply("foo")
|
|
||||||
);
|
|
||||||
extractionCacheManager.delete(namespace);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60_000)
|
@Test(timeout = 60_000L)
|
||||||
public void testFindNew()
|
public void testFindNew()
|
||||||
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
|
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
|
||||||
{
|
{
|
||||||
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
|
final JDBCExtractionNamespace extractionNamespace = ensureNamespace();
|
||||||
Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI");
|
|
||||||
uriField.setAccessible(true);
|
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
|
||||||
uriField.set(config, connectionURI);
|
|
||||||
|
insertValues("foo", "baz", "2900-01-01 00:00:00");
|
||||||
|
|
||||||
|
assertUpdated(extractionNamespace.getNamespace(), "foo", "baz");
|
||||||
|
}
|
||||||
|
|
||||||
|
private JDBCExtractionNamespace ensureNamespace()
|
||||||
|
throws NoSuchFieldException, IllegalAccessException, InterruptedException
|
||||||
|
{
|
||||||
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
|
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
|
||||||
namespace,
|
namespace,
|
||||||
config,
|
derbyConnectorRule.getMetadataConnectorConfig(),
|
||||||
tableName,
|
tableName,
|
||||||
keyName,
|
keyName,
|
||||||
valName,
|
valName,
|
||||||
tsColumn,
|
tsColumn,
|
||||||
new Period(1)
|
new Period(10)
|
||||||
);
|
);
|
||||||
extractionCacheManager.schedule(extractionNamespace);
|
extractionCacheManager.schedule(extractionNamespace);
|
||||||
while (!fnCache.containsKey(extractionNamespace.getNamespace())) {
|
|
||||||
Thread.sleep(1);
|
|
||||||
}
|
|
||||||
Function<String, String> extractionFn = fnCache.get(extractionNamespace.getNamespace());
|
|
||||||
Assert.assertEquals(
|
|
||||||
"bar",
|
|
||||||
extractionFn.apply("foo")
|
|
||||||
);
|
|
||||||
|
|
||||||
insertValues("foo", "baz", "2900-01-01 00:00:00");
|
waitForUpdates(1_000L, 2L);
|
||||||
Thread.sleep(100);
|
|
||||||
extractionFn = fnCache.get(extractionNamespace.getNamespace());
|
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
"baz",
|
"sanity check not correct",
|
||||||
extractionFn.apply("foo")
|
"bar",
|
||||||
|
fnCache.get(extractionNamespace.getNamespace()).apply("foo")
|
||||||
|
);
|
||||||
|
return extractionNamespace;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForUpdates(long timeout, long numUpdates) throws InterruptedException
|
||||||
|
{
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
long pre = 0L;
|
||||||
|
synchronized (updateLock) {
|
||||||
|
pre = updates.get();
|
||||||
|
}
|
||||||
|
long post = 0L;
|
||||||
|
do {
|
||||||
|
// Sleep to spare a few cpu cycles
|
||||||
|
Thread.sleep(5);
|
||||||
|
log.debug("Waiting for updateLock");
|
||||||
|
synchronized (updateLock) {
|
||||||
|
Assert.assertTrue("Failed waiting for update", System.currentTimeMillis() - startTime < timeout);
|
||||||
|
post = updates.get();
|
||||||
|
}
|
||||||
|
} while (post < pre + numUpdates);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertUpdated(String namespace, String key, String expected) throws InterruptedException
|
||||||
|
{
|
||||||
|
waitForUpdates(1_000L, 2L);
|
||||||
|
|
||||||
|
Function<String, String> extractionFn = fnCache.get(namespace);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"update check",
|
||||||
|
expected,
|
||||||
|
extractionFn.apply(key)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,8 +157,13 @@ public class NamespaceExtractionCacheManagersTest
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t)
|
public void onFailure(Throwable t)
|
||||||
{
|
{
|
||||||
log.error(t, "Error waiting");
|
try {
|
||||||
throw Throwables.propagate(t);
|
log.error(t, "Error waiting");
|
||||||
|
throw Throwables.propagate(t);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class TestDerbyConnector extends DerbyConnector
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String dbSafeUUID()
|
public static String dbSafeUUID()
|
||||||
{
|
{
|
||||||
return UUID.randomUUID().toString().replace("-", "");
|
return UUID.randomUUID().toString().replace("-", "");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue