diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java index b44faf3dd86..4c319f4be05 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java @@ -21,27 +21,21 @@ package io.druid.server.namespace; import com.google.common.base.Function; import com.google.common.base.Strings; -import com.google.inject.Inject; import com.metamx.common.Pair; import io.druid.common.utils.JodaUtils; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; import io.druid.query.extraction.namespace.JDBCExtractionNamespace; -import io.druid.server.namespace.cache.NamespaceExtractionCacheManager; import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.FoldController; -import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; -import org.skife.jdbi.v2.util.StringMapper; import org.skife.jdbi.v2.util.TimestampMapper; import javax.annotation.Nullable; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -82,7 +76,7 @@ public class JDBCExtractionNamespaceFunctionFactory { final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion); final Long lastDBUpdate = lastUpdates(namespace); - if(lastDBUpdate != null && lastDBUpdate <= lastCheck){ + if (lastDBUpdate != null && lastDBUpdate <= lastCheck) { return new Callable() { @Override @@ -132,23 +126,7 @@ public class JDBCExtractionNamespaceFunctionFactory return new Pair(r.getString(keyColumn), r.getString(valueColumn)); } } - ).fold( - new LinkedList>(), - new Folder3>, Pair>() - { - @Override - public LinkedList> fold( - LinkedList> accumulator, - Pair rs, - FoldController control, - StatementContext ctx - ) throws SQLException - { - accumulator.add(rs); - return accumulator; - } - } - ); + ).list(); } } ); diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java index f59e1dbf3e3..5ef79187ef6 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java @@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference; }) public abstract class NamespaceExtractionCacheManager { - private static class NamespaceImplData + protected static class NamespaceImplData { public NamespaceImplData( final ListenableFuture future, diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java index a6c73ed122c..d8199dff8d0 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java @@ -22,34 +22,32 @@ package io.druid.server.namespace.cache; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.metamx.common.ISE; import com.metamx.common.lifecycle.Lifecycle; -import io.druid.metadata.MetadataStorageConnectorConfig; +import com.metamx.common.logger.Logger; +import io.druid.metadata.TestDerbyConnector; import io.druid.query.extraction.namespace.ExtractionNamespace; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; import io.druid.query.extraction.namespace.JDBCExtractionNamespace; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.namespace.JDBCExtractionNamespaceFunctionFactory; -import org.apache.commons.dbcp2.BasicDataSource; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.tweak.HandleCallback; -import java.lang.reflect.Field; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; /** * @@ -57,6 +55,9 @@ import java.util.concurrent.ExecutionException; @RunWith(Parameterized.class) public class JDBCExtractionNamespaceTest { + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private static final Logger log = new Logger(JDBCExtractionNamespaceTest.class); private static final String namespace = "testNamespace"; private static final String tableName = "abstractDbRenameTest"; private static final String keyName = "keyName"; @@ -67,45 +68,11 @@ public class JDBCExtractionNamespaceTest "bad", "bar", "how about that", "foo" ); - private static final String connectionURI = "jdbc:derby:memory:druid;create=true"; - private static DBI dbi; - - @BeforeClass - public static final void createTables() - { - final BasicDataSource datasource = new BasicDataSource(); - datasource.setUrl(connectionURI); - datasource.setDriverClassLoader(JDBCExtractionNamespaceTest.class.getClassLoader()); - datasource.setDriverClassName("org.apache.derby.jdbc.EmbeddedDriver"); - dbi = new DBI(datasource); - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle - .createStatement( - String.format( - "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))", - tableName, - tsColumn_, - keyName, - valName - ) - ) - .execute(); - return null; - } - } - ); - } - @Parameterized.Parameters(name = "{0}") public static Collection getParameters() { - return ImmutableList.of( + return ImmutableList.of( new Object[]{"tsColumn"}, new Object[]{null} ); @@ -120,95 +87,126 @@ public class JDBCExtractionNamespaceTest private final ConcurrentMap> fnCache = new ConcurrentHashMap<>(); private final String tsColumn; - private NamespaceExtractionCacheManager extractionCacheManager; + private OnHeapNamespaceExtractionCacheManager extractionCacheManager; private final Lifecycle lifecycle = new Lifecycle(); + private final AtomicLong updates = new AtomicLong(0L); + private final Object updateLock = new Object(); + private Handle handle; @Before - public void setup() + public void setup() throws Exception { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).execute(); - handle.commit(); - return null; - } - } + log.info("Setting up"); + handle = derbyConnectorRule.getConnector().getDBI().open(); + Assert.assertEquals( + 0, + handle.createStatement( + String.format( + "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))", + tableName, + tsColumn_, + keyName, + valName + ) + ).setQueryTimeout(1).execute() ); + handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute(); + handle.commit(); + for (Map.Entry entry : renames.entrySet()) { insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); } - final Map, ExtractionNamespaceFunctionFactory> factoryMap = new HashMap<>(); + extractionCacheManager = new OnHeapNamespaceExtractionCacheManager( lifecycle, fnCache, new NoopServiceEmitter(), ImmutableMap., ExtractionNamespaceFunctionFactory>of( - JDBCExtractionNamespace.class, new JDBCExtractionNamespaceFunctionFactory() + JDBCExtractionNamespace.class, + new JDBCExtractionNamespaceFunctionFactory() + { + @Override + public Callable getCachePopulator( + final JDBCExtractionNamespace namespace, + final String lastVersion, + final Map cache + ) + { + final Callable cachePopulator = super.getCachePopulator(namespace, lastVersion, cache); + return new Callable() + { + @Override + public String call() throws Exception + { + synchronized (updateLock) { + log.debug("Running cache populator"); + try { + return cachePopulator.call(); + } + finally { + updates.incrementAndGet(); + } + } + } + }; + } + } ) ); + lifecycle.start(); } @After - public void tearDown() + public void tearDown() throws InterruptedException { + log.info("Tearing down"); + handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute(); + handle.close(); + Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace)); lifecycle.stop(); + final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(namespace); + if (implData != null && implData.future != null) { + implData.future.cancel(true); + Assert.assertTrue(implData.future.isDone()); + } } - private void insertValues(final String key, final String val, final String updateTs) + private void insertValues(final String key, final String val, final String updateTs) throws InterruptedException { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - final String query; - if (tsColumn == null) { - handle.createStatement( - String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key) - ).execute(); - handle.commit(); - query = String.format( - "INSERT INTO %s (%s, %s) VALUES ('%s', '%s')", - tableName, - keyName, valName, - key, val - ); - } else { - query = String.format( - "INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')", - tableName, - tsColumn, keyName, valName, - updateTs, key, val - ); - } - if (1 != handle.createStatement(query).execute()) { - throw new ISE("Did not return the correct number of rows"); - } - handle.commit(); - return null; - } - } - ); + final String query; + if (tsColumn == null) { + handle.createStatement( + String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key) + ).setQueryTimeout(1).execute(); + query = String.format( + "INSERT INTO %s (%s, %s) VALUES ('%s', '%s')", + tableName, + keyName, valName, + key, val + ); + } else { + query = String.format( + "INSERT INTO %s (%s, %s, %s) VALUES ('%s', '%s', '%s')", + tableName, + tsColumn, keyName, valName, + updateTs, key, val + ); + } + Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute()); + handle.commit(); + // Some internals have timing resolution no better than MS. This is to help make sure that checks for timings + // have elapsed at least to the next ms... 2 is for good measure. + Thread.sleep(2); } - @Test(timeout = 60_000) + @Test(timeout = 60_000L) public void testMapping() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException, ExecutionException, - InterruptedException + InterruptedException, TimeoutException { - MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); - Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI"); - uriField.setAccessible(true); - uriField.set(config, connectionURI); - final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace( namespace, - config, + derbyConnectorRule.getMetadataConnectorConfig(), tableName, keyName, valName, @@ -217,92 +215,104 @@ public class JDBCExtractionNamespaceTest ); NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace)); Function extractionFn = fnCache.get(extractionNamespace.getNamespace()); + for (Map.Entry entry : renames.entrySet()) { String key = entry.getKey(); String val = entry.getValue(); Assert.assertEquals( + "non-null check", val, - String.format(val, extractionFn.apply(key)) + extractionFn.apply(key) ); } Assert.assertEquals( + "null check", null, extractionFn.apply("baz") ); } - @Test(timeout = 60_000) + @Test(timeout = 60_000L) public void testSkipOld() throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException { - MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); - Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI"); - uriField.setAccessible(true); - uriField.set(config, connectionURI); - final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace( - namespace, - config, - tableName, - keyName, - valName, - tsColumn, - new Period(1) - ); - extractionCacheManager.schedule(extractionNamespace); - while (!fnCache.containsKey(extractionNamespace.getNamespace())) { - Thread.sleep(1); - } - Assert.assertEquals( - "bar", - fnCache.get(extractionNamespace.getNamespace()).apply("foo") - ); + final JDBCExtractionNamespace extractionNamespace = ensureNamespace(); + + assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); + if (tsColumn != null) { insertValues("foo", "baz", "1900-01-01 00:00:00"); } - Thread.sleep(10); - - Assert.assertEquals( - "bar", - fnCache.get(extractionNamespace.getNamespace()).apply("foo") - ); - extractionCacheManager.delete(namespace); + assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); } - @Test(timeout = 60_000) + @Test(timeout = 60_000L) public void testFindNew() throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException { - MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); - Field uriField = MetadataStorageConnectorConfig.class.getDeclaredField("connectURI"); - uriField.setAccessible(true); - uriField.set(config, connectionURI); + final JDBCExtractionNamespace extractionNamespace = ensureNamespace(); + + assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); + + insertValues("foo", "baz", "2900-01-01 00:00:00"); + + assertUpdated(extractionNamespace.getNamespace(), "foo", "baz"); + } + + private JDBCExtractionNamespace ensureNamespace() + throws NoSuchFieldException, IllegalAccessException, InterruptedException + { final JDBCExtractionNamespace extractionNamespace = new JDBCExtractionNamespace( namespace, - config, + derbyConnectorRule.getMetadataConnectorConfig(), tableName, keyName, valName, tsColumn, - new Period(1) + new Period(10) ); extractionCacheManager.schedule(extractionNamespace); - while (!fnCache.containsKey(extractionNamespace.getNamespace())) { - Thread.sleep(1); - } - Function extractionFn = fnCache.get(extractionNamespace.getNamespace()); - Assert.assertEquals( - "bar", - extractionFn.apply("foo") - ); - insertValues("foo", "baz", "2900-01-01 00:00:00"); - Thread.sleep(100); - extractionFn = fnCache.get(extractionNamespace.getNamespace()); + waitForUpdates(1_000L, 2L); + Assert.assertEquals( - "baz", - extractionFn.apply("foo") + "sanity check not correct", + "bar", + fnCache.get(extractionNamespace.getNamespace()).apply("foo") + ); + return extractionNamespace; + } + + private void waitForUpdates(long timeout, long numUpdates) throws InterruptedException + { + long startTime = System.currentTimeMillis(); + long pre = 0L; + synchronized (updateLock) { + pre = updates.get(); + } + long post = 0L; + do { + // Sleep to spare a few cpu cycles + Thread.sleep(5); + log.debug("Waiting for updateLock"); + synchronized (updateLock) { + Assert.assertTrue("Failed waiting for update", System.currentTimeMillis() - startTime < timeout); + post = updates.get(); + } + } while (post < pre + numUpdates); + } + + private void assertUpdated(String namespace, String key, String expected) throws InterruptedException + { + waitForUpdates(1_000L, 2L); + + Function extractionFn = fnCache.get(namespace); + Assert.assertEquals( + "update check", + expected, + extractionFn.apply(key) ); } } diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java index 21d602aaf97..fd38b943911 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java @@ -157,8 +157,13 @@ public class NamespaceExtractionCacheManagersTest @Override public void onFailure(Throwable t) { - log.error(t, "Error waiting"); - throw Throwables.propagate(t); + try { + log.error(t, "Error waiting"); + throw Throwables.propagate(t); + } + finally { + latch.countDown(); + } } } ); diff --git a/server/src/test/java/io/druid/metadata/TestDerbyConnector.java b/server/src/test/java/io/druid/metadata/TestDerbyConnector.java index 98735ca7026..184ca697194 100644 --- a/server/src/test/java/io/druid/metadata/TestDerbyConnector.java +++ b/server/src/test/java/io/druid/metadata/TestDerbyConnector.java @@ -62,7 +62,7 @@ public class TestDerbyConnector extends DerbyConnector } } - private static String dbSafeUUID() + public static String dbSafeUUID() { return UUID.randomUUID().toString().replace("-", ""); }