Merge pull request #1658 from metamx/cleanupJDBCExtractionNamespaceTest

Hopefully add better timeouts and ordering to JDBCExtractionNamespaceTest
This commit is contained in:
Nishant 2015-09-02 23:49:49 +05:30
commit 0096e6a0a0
5 changed files with 173 additions and 180 deletions

View File

@ -21,27 +21,21 @@ package io.druid.server.namespace;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import io.druid.common.utils.JodaUtils;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.StringMapper;
import org.skife.jdbi.v2.util.TimestampMapper;
import javax.annotation.Nullable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -82,7 +76,7 @@ public class JDBCExtractionNamespaceFunctionFactory
{
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
final Long lastDBUpdate = lastUpdates(namespace);
if(lastDBUpdate != null && lastDBUpdate <= lastCheck){
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
return new Callable<String>()
{
@Override
@ -132,23 +126,7 @@ public class JDBCExtractionNamespaceFunctionFactory
return new Pair<String, String>(r.getString(keyColumn), r.getString(valueColumn));
}
}
).fold(
new LinkedList<Pair<String, String>>(),
new Folder3<LinkedList<Pair<String, String>>, Pair<String, String>>()
{
@Override
public LinkedList<Pair<String, String>> fold(
LinkedList<Pair<String, String>> accumulator,
Pair<String, String> rs,
FoldController control,
StatementContext ctx
) throws SQLException
{
accumulator.add(rs);
return accumulator;
}
}
);
).list();
}
}
);

View File

@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
})
public abstract class NamespaceExtractionCacheManager
{
private static class NamespaceImplData
protected static class NamespaceImplData
{
public NamespaceImplData(
final ListenableFuture<?> future,

View File

@ -22,34 +22,32 @@ package io.druid.server.namespace.cache;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.metadata.MetadataStorageConnectorConfig;
import com.metamx.common.logger.Logger;
import io.druid.metadata.TestDerbyConnector;
import io.druid.query.extraction.namespace.ExtractionNamespace;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.server.namespace.JDBCExtractionNamespaceFunctionFactory;
import org.apache.commons.dbcp2.BasicDataSource;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
*
@ -57,6 +55,9 @@ import java.util.concurrent.ExecutionException;
@RunWith(Parameterized.class)
public class JDBCExtractionNamespaceTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private static final Logger log = new Logger(JDBCExtractionNamespaceTest.class);
private static final String namespace = "testNamespace";
private static final String tableName = "abstractDbRenameTest";
private static final String keyName = "keyName";
@ -67,45 +68,11 @@ public class JDBCExtractionNamespaceTest
"bad", "bar",
"how about that", "foo"
);
private static final String connectionURI = "jdbc:derby:memory:druid;create=true";
private static DBI dbi;
@BeforeClass
public static final void createTables()
{
final BasicDataSource datasource = new BasicDataSource();
datasource.setUrl(connectionURI);
datasource.setDriverClassLoader(JDBCExtractionNamespaceTest.class.getClassLoader());
datasource.setDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
dbi = new DBI(datasource);
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle
.createStatement(
String.format(
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))",
tableName,
tsColumn_,
keyName,
valName
)
)
.execute();
return null;
}
}
);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> getParameters()
{
return ImmutableList.<Object[]>of(
return ImmutableList.of(
new Object[]{"tsColumn"},
new Object[]{null}
);
@ -120,95 +87,126 @@ public class JDBCExtractionNamespaceTest
private final ConcurrentMap<String, Function<String, String>> fnCache = new ConcurrentHashMap<>();
private final String tsColumn;
private NamespaceExtractionCacheManager extractionCacheManager;
private OnHeapNamespaceExtractionCacheManager extractionCacheManager;
private final Lifecycle lifecycle = new Lifecycle();
private final AtomicLong updates = new AtomicLong(0L);
private final Object updateLock = new Object();
private Handle handle;
@Before
public void setup()
public void setup() throws Exception
{
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).execute();
handle.commit();
return null;
}
}
log.info("Setting up");
handle = derbyConnectorRule.getConnector().getDBI().open();
Assert.assertEquals(
0,
handle.createStatement(
String.format(
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))",
tableName,
tsColumn_,
keyName,
valName
)
).setQueryTimeout(1).execute()
);
handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute();
handle.commit();
for (Map.Entry<String, String> entry : renames.entrySet()) {
insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
}
final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>> factoryMap = new HashMap<>();
extractionCacheManager = new OnHeapNamespaceExtractionCacheManager(
lifecycle,
fnCache,
new NoopServiceEmitter(),
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
JDBCExtractionNamespace.class, new JDBCExtractionNamespaceFunctionFactory()
JDBCExtractionNamespace.class,
new JDBCExtractionNamespaceFunctionFactory()
{
@Override
public Callable<String> getCachePopulator(
final JDBCExtractionNamespace namespace,
final String lastVersion,
final Map<String, String> cache
)
{
final Callable<String> cachePopulator = super.getCachePopulator(namespace, lastVersion, cache);
return new Callable<String>()
{
@Override
public String call() throws Exception
{
synchronized (updateLock) {
log.debug("Running cache populator");
try {
return cachePopulator.call();
}
finally {
updates.incrementAndGet();
}
}
}
};
}
}
)
);
lifecycle.start();
}
@After
public void tearDown()
public void tearDown() throws InterruptedException
{
log.info("Tearing down");
handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute();
handle.close();
Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace));
lifecycle.stop();
final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(namespace);
if (implData != null && implData.future != null) {
implData.future.cancel(true);
Assert.assertTrue(implData.future.isDone());
}
}
private void insertValues(final String key, final String val, final String updateTs)
private void insertValues(final String key, final String val, final String updateTs) throws InterruptedException
{
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
final String query;
if (tsColumn == null) {
handle.createStatement(
String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
).execute();
handle.commit();
query = String.format(
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
tableName,
keyName, valName,
key, val
);
} else {
query = String.format(
"INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')",
tableName,
tsColumn, keyName, valName,
updateTs, key, val
);
}
if (1 != handle.createStatement(query).execute()) {
throw new ISE("Did not return the correct number of rows");
}
handle.commit();
return null;
}
}
);
final String query;
if (tsColumn == null) {
handle.createStatement(
String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
).setQueryTimeout(1).execute();
query = String.format(
"INSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
tableName,
keyName, valName,
key, val
);
} else {
query = String.format(
"INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')",
tableName,
tsColumn, keyName, valName,
updateTs, key, val
);
}
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
handle.commit();
// Some internals have timing resolution no better than MS. This is to help make sure that checks for timings
// have elapsed at least to the next ms... 2 is for good measure.
Thread.sleep(2);
}
@Test(timeout = 60_000)
@Test(timeout = 60_000L)
public void testMapping()
throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException,
InterruptedException
InterruptedException, TimeoutException
{
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI");
uriField.setAccessible(true);
uriField.set(config, connectionURI);
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
namespace,
config,
derbyConnectorRule.getMetadataConnectorConfig(),
tableName,
keyName,
valName,
@ -217,92 +215,104 @@ public class JDBCExtractionNamespaceTest
);
NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace));
Function<String, String> extractionFn = fnCache.get(extractionNamespace.getNamespace());
for (Map.Entry<String, String> entry : renames.entrySet()) {
String key = entry.getKey();
String val = entry.getValue();
Assert.assertEquals(
"non-null check",
val,
String.format(val, extractionFn.apply(key))
extractionFn.apply(key)
);
}
Assert.assertEquals(
"null check",
null,
extractionFn.apply("baz")
);
}
@Test(timeout = 60_000)
@Test(timeout = 60_000L)
public void testSkipOld()
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
{
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI");
uriField.setAccessible(true);
uriField.set(config, connectionURI);
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
namespace,
config,
tableName,
keyName,
valName,
tsColumn,
new Period(1)
);
extractionCacheManager.schedule(extractionNamespace);
while (!fnCache.containsKey(extractionNamespace.getNamespace())) {
Thread.sleep(1);
}
Assert.assertEquals(
"bar",
fnCache.get(extractionNamespace.getNamespace()).apply("foo")
);
final JDBCExtractionNamespace extractionNamespace = ensureNamespace();
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
if (tsColumn != null) {
insertValues("foo", "baz", "1900-01-01 00:00:00");
}
Thread.sleep(10);
Assert.assertEquals(
"bar",
fnCache.get(extractionNamespace.getNamespace()).apply("foo")
);
extractionCacheManager.delete(namespace);
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
}
@Test(timeout = 60_000)
@Test(timeout = 60_000L)
public void testFindNew()
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
{
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI");
uriField.setAccessible(true);
uriField.set(config, connectionURI);
final JDBCExtractionNamespace extractionNamespace = ensureNamespace();
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
insertValues("foo", "baz", "2900-01-01 00:00:00");
assertUpdated(extractionNamespace.getNamespace(), "foo", "baz");
}
private JDBCExtractionNamespace ensureNamespace()
throws NoSuchFieldException, IllegalAccessException, InterruptedException
{
final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace(
namespace,
config,
derbyConnectorRule.getMetadataConnectorConfig(),
tableName,
keyName,
valName,
tsColumn,
new Period(1)
new Period(10)
);
extractionCacheManager.schedule(extractionNamespace);
while (!fnCache.containsKey(extractionNamespace.getNamespace())) {
Thread.sleep(1);
}
Function<String, String> extractionFn = fnCache.get(extractionNamespace.getNamespace());
Assert.assertEquals(
"bar",
extractionFn.apply("foo")
);
insertValues("foo", "baz", "2900-01-01 00:00:00");
Thread.sleep(100);
extractionFn = fnCache.get(extractionNamespace.getNamespace());
waitForUpdates(1_000L, 2L);
Assert.assertEquals(
"baz",
extractionFn.apply("foo")
"sanity check not correct",
"bar",
fnCache.get(extractionNamespace.getNamespace()).apply("foo")
);
return extractionNamespace;
}
private void waitForUpdates(long timeout, long numUpdates) throws InterruptedException
{
long startTime = System.currentTimeMillis();
long pre = 0L;
synchronized (updateLock) {
pre = updates.get();
}
long post = 0L;
do {
// Sleep to spare a few cpu cycles
Thread.sleep(5);
log.debug("Waiting for updateLock");
synchronized (updateLock) {
Assert.assertTrue("Failed waiting for update", System.currentTimeMillis() - startTime < timeout);
post = updates.get();
}
} while (post < pre + numUpdates);
}
private void assertUpdated(String namespace, String key, String expected) throws InterruptedException
{
waitForUpdates(1_000L, 2L);
Function<String, String> extractionFn = fnCache.get(namespace);
Assert.assertEquals(
"update check",
expected,
extractionFn.apply(key)
);
}
}

View File

@ -157,8 +157,13 @@ public class NamespaceExtractionCacheManagersTest
@Override
public void onFailure(Throwable t)
{
log.error(t, "Error waiting");
throw Throwables.propagate(t);
try {
log.error(t, "Error waiting");
throw Throwables.propagate(t);
}
finally {
latch.countDown();
}
}
}
);

View File

@ -62,7 +62,7 @@ public class TestDerbyConnector extends DerbyConnector
}
}
private static String dbSafeUUID()
public static String dbSafeUUID()
{
return UUID.randomUUID().toString().replace("-", "");
}