From 38b0f1fbc27aa3dfd5f78ea6470393f140fb44f8 Mon Sep 17 00:00:00 2001 From: fjy Date: Sun, 27 Dec 2015 22:13:43 -0800 Subject: [PATCH] fix transient failures in unit tests --- .../namespace/KafkaExtractionNamespace.java | 6 +- .../namespace/TestKafkaExtractionCluster.java | 198 ++++++++++++------ .../namespace/JDBCExtractionNamespace.java | 1 - .../cache/JDBCExtractionNamespaceTest.java | 8 +- .../common/task/RealtimeIndexTaskTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../SimpleResourceManagementStrategyTest.java | 2 +- .../curator/announcement/AnnouncerTest.java | 9 + 8 files changed, 154 insertions(+), 76 deletions(-) diff --git a/extensions/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java b/extensions/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java index 14bfe3752d4..2733f4a558d 100644 --- a/extensions/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java +++ b/extensions/kafka-extraction-namespace/src/main/java/io/druid/query/extraction/namespace/KafkaExtractionNamespace.java @@ -39,10 +39,8 @@ public class KafkaExtractionNamespace implements ExtractionNamespace @JsonCreator public KafkaExtractionNamespace( - @NotNull @JsonProperty(value = "kafkaTopic", required = true) - final String kafkaTopic, - @NotNull @JsonProperty(value = "namespace", required = true) - final String namespace + @NotNull @JsonProperty(value = "kafkaTopic", required = true) final String kafkaTopic, + @NotNull @JsonProperty(value = "namespace", required = true) final String namespace ) { Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required"); diff --git a/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java b/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java index 74e4f486e7a..26724d48f8d 100644 --- a/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java +++ b/extensions/kafka-extraction-namespace/src/test/java/io/druid/query/extraction/namespace/TestKafkaExtractionCluster.java @@ -57,13 +57,12 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.zookeeper.CreateMode; import org.joda.time.DateTime; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.File; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -76,20 +75,20 @@ import java.util.concurrent.ConcurrentMap; public class TestKafkaExtractionCluster { private static final Logger log = new Logger(TestKafkaExtractionCluster.class); - private static KafkaServer kafkaServer; - private static Properties kafkaProperties = new Properties(); - private static KafkaConfig kafkaConfig; - private static final String topicName = "testTopic"; - private static final String namespace = "testNamespace"; - private static TestingServer zkTestServer; - private static KafkaExtractionManager renameManager; private static final Lifecycle lifecycle = new Lifecycle(); - private static NamespaceExtractionCacheManager extractionCacheManager; - private static ZkClient zkClient = null; - private static File tmpDir = Files.createTempDir(); - private static Injector injector; + private static final File tmpDir = Files.createTempDir(); + private static final String topicName = "testTopic"; + private static final String namespace = "testNamespace"; + private static final Properties kafkaProperties = new Properties(); + private KafkaServer kafkaServer; + private KafkaConfig kafkaConfig; + private TestingServer zkTestServer; + private ZkClient zkClient; + private KafkaExtractionManager renameManager; + private NamespaceExtractionCacheManager extractionCacheManager; + private Injector injector; public static class KafkaFactoryProvider implements Provider> { @@ -110,10 +109,12 @@ public class TestKafkaExtractionCluster } } - @BeforeClass - public static void setupStatic() throws Exception + @Before + public void setUp() throws Exception { zkTestServer = new TestingServer(-1, new File(tmpDir.getAbsolutePath() + "/zk"), true); + zkTestServer.start(); + zkClient = new ZkClient( zkTestServer.getConnectString(), 10000, @@ -142,38 +143,41 @@ public class TestKafkaExtractionCluster final long time = DateTime.parse("2015-01-01").getMillis(); kafkaServer = new KafkaServer( - kafkaConfig, new Time() - { + kafkaConfig, + new Time() + { - @Override - public long milliseconds() - { - return time; - } + @Override + public long milliseconds() + { + return time; + } - @Override - public long nanoseconds() - { - return milliseconds() * 1_000_000; - } + @Override + public long nanoseconds() + { + return milliseconds() * 1_000_000; + } - @Override - public void sleep(long ms) - { - try { - Thread.sleep(ms); + @Override + public void sleep(long ms) + { + try { + Thread.sleep(ms); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - } ); kafkaServer.startup(); + int sleepCount = 0; + while (!kafkaServer.kafkaController().isActive()) { - Thread.sleep(10); - if (++sleepCount > 100) { + Thread.sleep(100); + if (++sleepCount > 10) { throw new InterruptedException("Controller took to long to awaken"); } } @@ -184,6 +188,7 @@ public class TestKafkaExtractionCluster zkTestServer.getConnectString() + "/kafka", 10000, 10000, ZKStringSerializer$.MODULE$ ); + try { final Properties topicProperties = new Properties(); topicProperties.put("cleanup.policy", "compact"); @@ -198,11 +203,13 @@ public class TestKafkaExtractionCluster finally { zkClient.close(); } + final Properties kafkaProducerProperties = makeProducerProperties(); - Producer producer = new Producer(new ProducerConfig(kafkaProducerProperties)); + Producer producer = new Producer<>(new ProducerConfig(kafkaProducerProperties)); + try { producer.send( - new KeyedMessage( + new KeyedMessage<>( topicName, StringUtils.toUtf8("abcdefg"), StringUtils.toUtf8("abcdefg") @@ -221,7 +228,8 @@ public class TestKafkaExtractionCluster injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjectorWithModules( ImmutableList.of() - ), ImmutableList.of( + ), + ImmutableList.of( new Module() { @Override @@ -230,7 +238,8 @@ public class TestKafkaExtractionCluster binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); } - }, new NamespacedExtractionModule(), + }, + new NamespacedExtractionModule(), new KafkaExtractionNamespaceModule() { @Override @@ -255,9 +264,10 @@ public class TestKafkaExtractionCluster extractionCacheManager.schedule( new KafkaExtractionNamespace(topicName, namespace) ); + long start = System.currentTimeMillis(); while (renameManager.getBackgroundTaskCount() < 1) { - Thread.sleep(10); // wait for map populator to start up + Thread.sleep(100); // wait for map populator to start up if (System.currentTimeMillis() > start + 60_000) { throw new ISE("renameManager took too long to start"); } @@ -265,9 +275,10 @@ public class TestKafkaExtractionCluster log.info("--------------------------- started rename manager ---------------------------"); } - @AfterClass - public static void closeStatic() throws IOException + @After + public void tearDown() throws Exception { + lifecycle.stop(); if (null != renameManager) { renameManager.stop(); @@ -297,7 +308,7 @@ public class TestKafkaExtractionCluster } } - private static final Properties makeProducerProperties() + private final Properties makeProducerProperties() { final Properties kafkaProducerProperties = new Properties(); kafkaProducerProperties.putAll(kafkaProperties); @@ -309,55 +320,70 @@ public class TestKafkaExtractionCluster return kafkaProducerProperties; } - private static void checkServer() + private void checkServer() { if (!kafkaServer.apis().controller().isActive()) { throw new ISE("server is not active!"); } } - //@Test(timeout = 5_000) - @Test + @Test(timeout = 60_000L) public void testSimpleRename() throws InterruptedException { final Properties kafkaProducerProperties = makeProducerProperties(); - final Producer producer = new Producer(new ProducerConfig(kafkaProducerProperties)); + final Producer producer = new Producer<>(new ProducerConfig(kafkaProducerProperties)); + try { checkServer(); - final ConcurrentMap> fnFn = injector.getInstance(Key.get(new TypeLiteral>>() - { - }, Names.named("namespaceExtractionFunctionCache"))); - final ConcurrentMap>> reverseFn = injector.getInstance(Key.get(new TypeLiteral>>>() - { - }, Names.named("namespaceReverseExtractionFunctionCache"))); + + final ConcurrentMap> fnFn = + injector.getInstance( + Key.get( + new TypeLiteral>>() + { + }, + Names.named("namespaceExtractionFunctionCache") + ) + ); + + final ConcurrentMap>> reverseFn = + injector.getInstance( + Key.get( + new TypeLiteral>>>() + { + }, + Names.named("namespaceReverseExtractionFunctionCache") + ) + ); + KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace); - Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("foo")); - Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.get(extractionNamespace.getNamespace()).apply("foo")); + assertUpdated(null, extractionNamespace.getNamespace(), "foo", fnFn); + assertReverseUpdated(Collections.EMPTY_LIST, extractionNamespace.getNamespace(), "foo", reverseFn); long events = renameManager.getNumEvents(namespace); log.info("------------------------- Sending foo bar -------------------------------"); - producer.send(new KeyedMessage(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar"))); + producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar"))); long start = System.currentTimeMillis(); while (events == renameManager.getNumEvents(namespace)) { - Thread.sleep(10); + Thread.sleep(100); if (System.currentTimeMillis() > start + 60_000) { throw new ISE("Took too long to update event"); } } log.info("------------------------- Checking foo bar -------------------------------"); - Assert.assertEquals("bar", fnFn.get(extractionNamespace.getNamespace()).apply("foo")); - Assert.assertEquals(Arrays.asList("foo"), reverseFn.get(extractionNamespace.getNamespace()).apply("bar")); - Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("baz")); + assertUpdated("bar", extractionNamespace.getNamespace(), "foo", fnFn); + assertReverseUpdated(Arrays.asList("foo"), extractionNamespace.getNamespace(), "bar", reverseFn); + assertUpdated(null, extractionNamespace.getNamespace(), "baz", fnFn); checkServer(); events = renameManager.getNumEvents(namespace); log.info("------------------------- Sending baz bat -------------------------------"); - producer.send(new KeyedMessage(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat"))); + producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat"))); while (events == renameManager.getNumEvents(namespace)) { Thread.sleep(10); if (System.currentTimeMillis() > start + 60_000) { @@ -373,4 +399,44 @@ public class TestKafkaExtractionCluster producer.close(); } } + + private void assertUpdated( + String expected, + String namespace, + String key, + ConcurrentMap> lookup + ) + throws InterruptedException + { + final Function extractionFn = lookup.get(namespace); + + if (expected == null) { + while (extractionFn.apply(key) != null) { + Thread.sleep(100); + } + } else { + while (!expected.equals(extractionFn.apply(key))) { + Thread.sleep(100); + } + } + + Assert.assertEquals("update check", expected, extractionFn.apply(key)); + } + + private void assertReverseUpdated( + List expected, + String namespace, + String key, + ConcurrentMap>> lookup + ) + throws InterruptedException + { + final Function> extractionFn = lookup.get(namespace); + + while (!extractionFn.apply(key).equals(expected)) { + Thread.sleep(100); + } + + Assert.assertEquals("update check", expected, extractionFn.apply(key)); + } } diff --git a/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java b/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java index 0d4a905be82..b03bf987d49 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java @@ -36,7 +36,6 @@ import javax.validation.constraints.NotNull; @JsonTypeName("jdbc") public class JDBCExtractionNamespace implements ExtractionNamespace { - @JsonProperty private final MetadataStorageConnectorConfig connectorConfig; @JsonProperty diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java index 380c324e3ab..81d9ff10fee 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java @@ -413,7 +413,7 @@ public class JDBCExtractionNamespaceTest assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); } - @Test(timeout = 10_000L) + @Test(timeout = 60_000L) public void testFindNew() throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException { @@ -482,6 +482,12 @@ public class JDBCExtractionNamespaceTest waitForUpdates(1_000L, 2L); Function extractionFn = fnCache.get(namespace); + + // rely on test timeout to break out of this loop + while (!extractionFn.apply(key).equals(expected)) { + Thread.sleep(100); + } + Assert.assertEquals( "update check", expected, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index f1b54df3edb..cc8addd1f5a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -168,7 +168,7 @@ public class RealtimeIndexTaskTest ); } - @Test(timeout = 10000L) + @Test(timeout = 60_000L) public void testBasics() throws Exception { final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); @@ -221,7 +221,7 @@ public class RealtimeIndexTaskTest Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); } - @Test(timeout = 10000L) + @Test(timeout = 60_000L) public void testRestore() throws Exception { final File directory = tempFolder.newFolder(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index fbed169867b..cf46ccf5e92 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -907,7 +907,7 @@ public class TaskLifecycleTest EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate); } - @Test(timeout = 4000L) + @Test(timeout = 60_000L) public void testRealtimeIndexTaskFailure() throws Exception { setUpAndStartTaskQueue( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index ecef62fe508..be6393960ed 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -202,7 +202,7 @@ public class SimpleResourceManagementStrategyTest ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); emitter.emit(EasyMock.anyObject()); - EasyMock.expectLastCall(); + EasyMock.expectLastCall().atLeastOnce(); EasyMock.replay(emitter); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2); diff --git a/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java b/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java index babd8935270..f1158f04eff 100644 --- a/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java +++ b/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java @@ -20,6 +20,7 @@ package io.druid.curator.announcement; import com.google.common.collect.Sets; +import com.metamx.common.ISE; import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; import org.apache.curator.framework.CuratorFramework; @@ -172,6 +173,14 @@ public class AnnouncerTest extends CuratorTestBase announcer.stop(); + int count = 0; + while ((curator.checkExists().forPath(testPath1) != null) || (curator.checkExists().forPath(testPath1) != null)) { + Thread.sleep(100); + if (++count > 10) { + throw new ISE("Curator paths not getting cleaned up"); + } + } + Assert.assertNull(curator.checkExists().forPath(testPath1)); Assert.assertNull(curator.checkExists().forPath(testPath2)); }