mirror of https://github.com/apache/druid.git
Reorder Before/After in JDBCExtractionNamespaceTest
* Fixes https://github.com/druid-io/druid/issues/2120
This commit is contained in:
parent
7e64d5179f
commit
05c9e1b598
|
@ -21,11 +21,17 @@ package io.druid.server.namespace.cache;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closer;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.metadata.TestDerbyConnector;
|
||||
import io.druid.query.extraction.namespace.ExtractionNamespace;
|
||||
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
|
||||
|
@ -42,6 +48,8 @@ import org.junit.runner.RunWith;
|
|||
import org.junit.runners.Parameterized;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -50,8 +58,10 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -99,88 +109,198 @@ public class JDBCExtractionNamespaceTest
|
|||
private final Lifecycle lifecycle = new Lifecycle();
|
||||
private final AtomicLong updates = new AtomicLong(0L);
|
||||
private final Lock updateLock = new ReentrantLock(true);
|
||||
private Handle handle;
|
||||
private final Closer closer = Closer.create();
|
||||
private final AtomicReference<Handle> handleRef = new AtomicReference<>(null);
|
||||
private final ListeningExecutorService setupTeardownService =
|
||||
MoreExecutors.listeningDecorator(Execs.singleThreaded("JDBCExtractionNamespaceTeardown"));
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception
|
||||
{
|
||||
log.info("Setting up");
|
||||
handle = derbyConnectorRule.getConnector().getDBI().open();
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))",
|
||||
tableName,
|
||||
tsColumn_,
|
||||
keyName,
|
||||
valName
|
||||
)
|
||||
).setQueryTimeout(1).execute()
|
||||
);
|
||||
handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute();
|
||||
handle.commit();
|
||||
|
||||
for (Map.Entry<String, String> entry : renames.entrySet()) {
|
||||
insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
|
||||
}
|
||||
|
||||
extractionCacheManager = new OnHeapNamespaceExtractionCacheManager(
|
||||
lifecycle,
|
||||
fnCache,
|
||||
reverseFnCache,
|
||||
new NoopServiceEmitter(),
|
||||
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
|
||||
JDBCExtractionNamespace.class,
|
||||
new JDBCExtractionNamespaceFunctionFactory()
|
||||
final ListenableFuture<?> setupFuture = setupTeardownService.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final Handle handle = derbyConnectorRule.getConnector().getDBI().open();
|
||||
handleRef.set(handle);
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))",
|
||||
tableName,
|
||||
tsColumn_,
|
||||
keyName,
|
||||
valName
|
||||
)
|
||||
).setQueryTimeout(1).execute()
|
||||
);
|
||||
handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute();
|
||||
handle.commit();
|
||||
closer.register(new Closeable()
|
||||
{
|
||||
@Override
|
||||
public Callable<String> getCachePopulator(
|
||||
final JDBCExtractionNamespace namespace,
|
||||
final String lastVersion,
|
||||
final Map<String, String> cache
|
||||
)
|
||||
public void close() throws IOException
|
||||
{
|
||||
final Callable<String> cachePopulator = super.getCachePopulator(namespace, lastVersion, cache);
|
||||
return new Callable<String>()
|
||||
{
|
||||
@Override
|
||||
public String call() throws Exception
|
||||
{
|
||||
updateLock.lockInterruptibly();
|
||||
try {
|
||||
log.debug("Running cache populator");
|
||||
try {
|
||||
return cachePopulator.call();
|
||||
}
|
||||
finally {
|
||||
updates.incrementAndGet();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
updateLock.unlock();
|
||||
}
|
||||
}
|
||||
};
|
||||
// Register first so it gets run last and checks for cleanup
|
||||
final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(
|
||||
namespace);
|
||||
if (implData != null && implData.future != null) {
|
||||
implData.future.cancel(true);
|
||||
Assert.assertTrue(implData.future.isDone());
|
||||
}
|
||||
}
|
||||
});
|
||||
closer.register(new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute();
|
||||
handle.close();
|
||||
}
|
||||
});
|
||||
for (Map.Entry<String, String> entry : renames.entrySet()) {
|
||||
try {
|
||||
insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
extractionCacheManager = new OnHeapNamespaceExtractionCacheManager(
|
||||
lifecycle,
|
||||
fnCache,
|
||||
reverseFnCache,
|
||||
new NoopServiceEmitter(),
|
||||
ImmutableMap.<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>>of(
|
||||
JDBCExtractionNamespace.class,
|
||||
new JDBCExtractionNamespaceFunctionFactory()
|
||||
{
|
||||
@Override
|
||||
public Callable<String> getCachePopulator(
|
||||
final JDBCExtractionNamespace namespace,
|
||||
final String lastVersion,
|
||||
final Map<String, String> cache
|
||||
)
|
||||
{
|
||||
final Callable<String> cachePopulator = super.getCachePopulator(namespace, lastVersion, cache);
|
||||
return new Callable<String>()
|
||||
{
|
||||
@Override
|
||||
public String call() throws Exception
|
||||
{
|
||||
updateLock.lockInterruptibly();
|
||||
try {
|
||||
log.debug("Running cache populator");
|
||||
try {
|
||||
return cachePopulator.call();
|
||||
}
|
||||
finally {
|
||||
updates.incrementAndGet();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
updateLock.unlock();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
closer.register(new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
lifecycle.stop();
|
||||
}
|
||||
});
|
||||
closer.register(new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
lifecycle.start();
|
||||
final Closer setupCloser = Closer.create();
|
||||
setupCloser.register(new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) {
|
||||
throw new IOException("Unable to stop future");
|
||||
}
|
||||
}
|
||||
});
|
||||
try {
|
||||
setupFuture.get(10, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
throw setupCloser.rethrow(t);
|
||||
}
|
||||
finally {
|
||||
setupCloser.close();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws InterruptedException
|
||||
public void tearDown() throws InterruptedException, ExecutionException, TimeoutException, IOException
|
||||
{
|
||||
log.info("Tearing down");
|
||||
handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute();
|
||||
handle.close();
|
||||
Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace));
|
||||
lifecycle.stop();
|
||||
final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(namespace);
|
||||
if (implData != null && implData.future != null) {
|
||||
implData.future.cancel(true);
|
||||
Assert.assertTrue(implData.future.isDone());
|
||||
final Closer tearDownCloser = Closer.create();
|
||||
tearDownCloser.register(new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
setupTeardownService.shutdownNow();
|
||||
try {
|
||||
setupTeardownService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Interrupted", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
try {
|
||||
setupTeardownService.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
closer.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
).get(10, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
throw closer.rethrow(t);
|
||||
}
|
||||
finally {
|
||||
closer.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -188,7 +308,7 @@ public class JDBCExtractionNamespaceTest
|
|||
{
|
||||
final String query;
|
||||
if (tsColumn == null) {
|
||||
handle.createStatement(
|
||||
handleRef.get().createStatement(
|
||||
String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
|
||||
).setQueryTimeout(1).execute();
|
||||
query = String.format(
|
||||
|
@ -205,8 +325,8 @@ public class JDBCExtractionNamespaceTest
|
|||
updateTs, key, val
|
||||
);
|
||||
}
|
||||
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
|
||||
handle.commit();
|
||||
Assert.assertEquals(1, handleRef.get().createStatement(query).setQueryTimeout(1).execute());
|
||||
handleRef.get().commit();
|
||||
// Some internals have timing resolution no better than MS. This is to help make sure that checks for timings
|
||||
// have elapsed at least to the next ms... 2 is for good measure.
|
||||
Thread.sleep(2);
|
||||
|
@ -251,13 +371,31 @@ public class JDBCExtractionNamespaceTest
|
|||
);
|
||||
NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace));
|
||||
Function<String, List<String>> reverseExtractionFn = reverseFnCache.get(extractionNamespace.getNamespace());
|
||||
Assert.assertEquals("reverse lookup should match", Sets.newHashSet("foo", "bad"), Sets.newHashSet(reverseExtractionFn.apply("bar")));
|
||||
Assert.assertEquals("reverse lookup should match", Sets.newHashSet("how about that"), Sets.newHashSet(reverseExtractionFn.apply("foo")));
|
||||
Assert.assertEquals("reverse lookup should match", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply("")));
|
||||
Assert.assertEquals("null is same as empty string", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply(null)));
|
||||
Assert.assertEquals("reverse lookup of none existing value should be empty list",
|
||||
Collections.EMPTY_LIST,
|
||||
reverseExtractionFn.apply("does't exist"));
|
||||
Assert.assertEquals(
|
||||
"reverse lookup should match",
|
||||
Sets.newHashSet("foo", "bad"),
|
||||
Sets.newHashSet(reverseExtractionFn.apply("bar"))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"reverse lookup should match",
|
||||
Sets.newHashSet("how about that"),
|
||||
Sets.newHashSet(reverseExtractionFn.apply("foo"))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"reverse lookup should match",
|
||||
Sets.newHashSet("empty string"),
|
||||
Sets.newHashSet(reverseExtractionFn.apply(""))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"null is same as empty string",
|
||||
Sets.newHashSet("empty string"),
|
||||
Sets.newHashSet(reverseExtractionFn.apply(null))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"reverse lookup of none existing value should be empty list",
|
||||
Collections.EMPTY_LIST,
|
||||
reverseExtractionFn.apply("does't exist")
|
||||
);
|
||||
}
|
||||
|
||||
@Test(timeout = 10_000L)
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TestDerbyConnector extends DerbyConnector
|
|||
{
|
||||
return jdbcUri;
|
||||
}
|
||||
|
||||
|
||||
public static class DerbyConnectorRule extends ExternalResource
|
||||
{
|
||||
private TestDerbyConnector connector;
|
||||
|
@ -82,7 +82,14 @@ public class TestDerbyConnector extends DerbyConnector
|
|||
|
||||
public DerbyConnectorRule()
|
||||
{
|
||||
this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase("druidTest")));
|
||||
this("druidTest" + dbSafeUUID());
|
||||
}
|
||||
|
||||
private DerbyConnectorRule(
|
||||
final String defaultBase
|
||||
)
|
||||
{
|
||||
this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
|
||||
}
|
||||
|
||||
public DerbyConnectorRule(
|
||||
|
|
|
@ -63,7 +63,7 @@ public class SQLAuditManagerTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 10_000L)
|
||||
public void testAuditEntrySerde() throws IOException
|
||||
{
|
||||
AuditEntry entry = new AuditEntry(
|
||||
|
@ -82,7 +82,7 @@ public class SQLAuditManagerTest
|
|||
Assert.assertEquals(entry, serde);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 10_000L)
|
||||
public void testCreateAuditEntry() throws IOException
|
||||
{
|
||||
AuditEntry entry = new AuditEntry(
|
||||
|
@ -108,7 +108,7 @@ public class SQLAuditManagerTest
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 10_000L)
|
||||
public void testFetchAuditHistory() throws IOException
|
||||
{
|
||||
AuditEntry entry = new AuditEntry(
|
||||
|
@ -136,7 +136,7 @@ public class SQLAuditManagerTest
|
|||
Assert.assertEquals(entry, auditEntries.get(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 10_000L)
|
||||
public void testFetchAuditHistoryByKeyAndTypeWithLimit() throws IOException
|
||||
{
|
||||
AuditEntry entry1 = new AuditEntry(
|
||||
|
@ -172,7 +172,7 @@ public class SQLAuditManagerTest
|
|||
Assert.assertEquals(entry1, auditEntries.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 10_000L)
|
||||
public void testFetchAuditHistoryByTypeWithLimit() throws IOException
|
||||
{
|
||||
AuditEntry entry1 = new AuditEntry(
|
||||
|
@ -220,13 +220,13 @@ public class SQLAuditManagerTest
|
|||
Assert.assertEquals(entry2, auditEntries.get(1));
|
||||
}
|
||||
|
||||
@Test(expected=IllegalArgumentException.class)
|
||||
@Test(expected=IllegalArgumentException.class, timeout = 10_000L)
|
||||
public void testFetchAuditHistoryLimitBelowZero() throws IOException
|
||||
{
|
||||
auditManager.fetchAuditHistory("testType", -1);
|
||||
}
|
||||
|
||||
@Test(expected=IllegalArgumentException.class)
|
||||
@Test(expected=IllegalArgumentException.class, timeout = 10_000L)
|
||||
public void testFetchAuditHistoryLimitZero() throws IOException
|
||||
{
|
||||
auditManager.fetchAuditHistory("testType", 0);
|
||||
|
@ -240,7 +240,7 @@ public class SQLAuditManagerTest
|
|||
|
||||
private void dropTable(final String tableName)
|
||||
{
|
||||
connector.getDBI().withHandle(
|
||||
Assert.assertNull(connector.getDBI().withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
|
@ -251,6 +251,6 @@ public class SQLAuditManagerTest
|
|||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue