fix transient failures in unit tests

This commit is contained in:
fjy 2015-12-27 22:13:43 -08:00
parent d94821998f
commit 38b0f1fbc2
8 changed files with 154 additions and 76 deletions

View File

@ -39,10 +39,8 @@ public class KafkaExtractionNamespace implements ExtractionNamespace
@JsonCreator
public KafkaExtractionNamespace(
@NotNull @JsonProperty(value = "kafkaTopic", required = true)
final String kafkaTopic,
@NotNull @JsonProperty(value = "namespace", required = true)
final String namespace
@NotNull @JsonProperty(value = "kafkaTopic", required = true) final String kafkaTopic,
@NotNull @JsonProperty(value = "namespace", required = true) final String namespace
)
{
Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");

View File

@ -57,13 +57,12 @@ import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -76,20 +75,20 @@ import java.util.concurrent.ConcurrentMap;
public class TestKafkaExtractionCluster
{
private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
private static KafkaServer kafkaServer;
private static Properties kafkaProperties = new Properties();
private static KafkaConfig kafkaConfig;
private static final String topicName = "testTopic";
private static final String namespace = "testNamespace";
private static TestingServer zkTestServer;
private static KafkaExtractionManager renameManager;
private static final Lifecycle lifecycle = new Lifecycle();
private static NamespaceExtractionCacheManager extractionCacheManager;
private static ZkClient zkClient = null;
private static File tmpDir = Files.createTempDir();
private static Injector injector;
private static final File tmpDir = Files.createTempDir();
private static final String topicName = "testTopic";
private static final String namespace = "testNamespace";
private static final Properties kafkaProperties = new Properties();
private KafkaServer kafkaServer;
private KafkaConfig kafkaConfig;
private TestingServer zkTestServer;
private ZkClient zkClient;
private KafkaExtractionManager renameManager;
private NamespaceExtractionCacheManager extractionCacheManager;
private Injector injector;
public static class KafkaFactoryProvider implements Provider<ExtractionNamespaceFunctionFactory<?>>
{
@ -110,10 +109,12 @@ public class TestKafkaExtractionCluster
}
}
@BeforeClass
public static void setupStatic() throws Exception
@Before
public void setUp() throws Exception
{
zkTestServer = new TestingServer(-1, new File(tmpDir.getAbsolutePath() + "/zk"), true);
zkTestServer.start();
zkClient = new ZkClient(
zkTestServer.getConnectString(),
10000,
@ -142,38 +143,41 @@ public class TestKafkaExtractionCluster
final long time = DateTime.parse("2015-01-01").getMillis();
kafkaServer = new KafkaServer(
kafkaConfig, new Time()
{
kafkaConfig,
new Time()
{
@Override
public long milliseconds()
{
return time;
}
@Override
public long milliseconds()
{
return time;
}
@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
}
@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
}
@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
);
kafkaServer.startup();
int sleepCount = 0;
while (!kafkaServer.kafkaController().isActive()) {
Thread.sleep(10);
if (++sleepCount > 100) {
Thread.sleep(100);
if (++sleepCount > 10) {
throw new InterruptedException("Controller took to long to awaken");
}
}
@ -184,6 +188,7 @@ public class TestKafkaExtractionCluster
zkTestServer.getConnectString() + "/kafka", 10000, 10000,
ZKStringSerializer$.MODULE$
);
try {
final Properties topicProperties = new Properties();
topicProperties.put("cleanup.policy", "compact");
@ -198,11 +203,13 @@ public class TestKafkaExtractionCluster
finally {
zkClient.close();
}
final Properties kafkaProducerProperties = makeProducerProperties();
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(kafkaProducerProperties));
Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
try {
producer.send(
new KeyedMessage<byte[], byte[]>(
new KeyedMessage<>(
topicName,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")
@ -221,7 +228,8 @@ public class TestKafkaExtractionCluster
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of()
), ImmutableList.of(
),
ImmutableList.<Module>of(
new Module()
{
@Override
@ -230,7 +238,8 @@ public class TestKafkaExtractionCluster
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
}
}, new NamespacedExtractionModule(),
},
new NamespacedExtractionModule(),
new KafkaExtractionNamespaceModule()
{
@Override
@ -255,9 +264,10 @@ public class TestKafkaExtractionCluster
extractionCacheManager.schedule(
new KafkaExtractionNamespace(topicName, namespace)
);
long start = System.currentTimeMillis();
while (renameManager.getBackgroundTaskCount() < 1) {
Thread.sleep(10); // wait for map populator to start up
Thread.sleep(100); // wait for map populator to start up
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("renameManager took too long to start");
}
@ -265,9 +275,10 @@ public class TestKafkaExtractionCluster
log.info("--------------------------- started rename manager ---------------------------");
}
@AfterClass
public static void closeStatic() throws IOException
@After
public void tearDown() throws Exception
{
lifecycle.stop();
if (null != renameManager) {
renameManager.stop();
@ -297,7 +308,7 @@ public class TestKafkaExtractionCluster
}
}
private static final Properties makeProducerProperties()
private final Properties makeProducerProperties()
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
@ -309,55 +320,70 @@ public class TestKafkaExtractionCluster
return kafkaProducerProperties;
}
private static void checkServer()
private void checkServer()
{
if (!kafkaServer.apis().controller().isActive()) {
throw new ISE("server is not active!");
}
}
//@Test(timeout = 5_000)
@Test
@Test(timeout = 60_000L)
public void testSimpleRename() throws InterruptedException
{
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(kafkaProducerProperties));
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));
try {
checkServer();
final ConcurrentMap<String, Function<String, String>> fnFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, String>>>()
{
}, Names.named("namespaceExtractionFunctionCache")));
final ConcurrentMap<String, Function<String, List<String>>> reverseFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, List<String>>>>()
{
}, Names.named("namespaceReverseExtractionFunctionCache")));
final ConcurrentMap<String, Function<String, String>> fnFn =
injector.getInstance(
Key.get(
new TypeLiteral<ConcurrentMap<String, Function<String, String>>>()
{
},
Names.named("namespaceExtractionFunctionCache")
)
);
final ConcurrentMap<String, Function<String, List<String>>> reverseFn =
injector.getInstance(
Key.get(
new TypeLiteral<ConcurrentMap<String, Function<String, List<String>>>>()
{
},
Names.named("namespaceReverseExtractionFunctionCache")
)
);
KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace);
Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("foo"));
Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.get(extractionNamespace.getNamespace()).apply("foo"));
assertUpdated(null, extractionNamespace.getNamespace(), "foo", fnFn);
assertReverseUpdated(Collections.EMPTY_LIST, extractionNamespace.getNamespace(), "foo", reverseFn);
long events = renameManager.getNumEvents(namespace);
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new KeyedMessage<byte[], byte[]>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == renameManager.getNumEvents(namespace)) {
Thread.sleep(10);
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking foo bar -------------------------------");
Assert.assertEquals("bar", fnFn.get(extractionNamespace.getNamespace()).apply("foo"));
Assert.assertEquals(Arrays.asList("foo"), reverseFn.get(extractionNamespace.getNamespace()).apply("bar"));
Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("baz"));
assertUpdated("bar", extractionNamespace.getNamespace(), "foo", fnFn);
assertReverseUpdated(Arrays.asList("foo"), extractionNamespace.getNamespace(), "bar", reverseFn);
assertUpdated(null, extractionNamespace.getNamespace(), "baz", fnFn);
checkServer();
events = renameManager.getNumEvents(namespace);
log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new KeyedMessage<byte[], byte[]>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == renameManager.getNumEvents(namespace)) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
@ -373,4 +399,44 @@ public class TestKafkaExtractionCluster
producer.close();
}
}
private void assertUpdated(
String expected,
String namespace,
String key,
ConcurrentMap<String, Function<String, String>> lookup
)
throws InterruptedException
{
final Function<String, String> extractionFn = lookup.get(namespace);
if (expected == null) {
while (extractionFn.apply(key) != null) {
Thread.sleep(100);
}
} else {
while (!expected.equals(extractionFn.apply(key))) {
Thread.sleep(100);
}
}
Assert.assertEquals("update check", expected, extractionFn.apply(key));
}
private void assertReverseUpdated(
List<String> expected,
String namespace,
String key,
ConcurrentMap<String, Function<String, List<String>>> lookup
)
throws InterruptedException
{
final Function<String, List<String>> extractionFn = lookup.get(namespace);
while (!extractionFn.apply(key).equals(expected)) {
Thread.sleep(100);
}
Assert.assertEquals("update check", expected, extractionFn.apply(key));
}
}

View File

@ -36,7 +36,6 @@ import javax.validation.constraints.NotNull;
@JsonTypeName("jdbc")
public class JDBCExtractionNamespace implements ExtractionNamespace
{
@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
@JsonProperty

View File

@ -413,7 +413,7 @@ public class JDBCExtractionNamespaceTest
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
}
@Test(timeout = 10_000L)
@Test(timeout = 60_000L)
public void testFindNew()
throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
{
@ -482,6 +482,12 @@ public class JDBCExtractionNamespaceTest
waitForUpdates(1_000L, 2L);
Function<String, String> extractionFn = fnCache.get(namespace);
// rely on test timeout to break out of this loop
while (!extractionFn.apply(key).equals(expected)) {
Thread.sleep(100);
}
Assert.assertEquals(
"update check",
expected,

View File

@ -168,7 +168,7 @@ public class RealtimeIndexTaskTest
);
}
@Test(timeout = 10000L)
@Test(timeout = 60_000L)
public void testBasics() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
@ -221,7 +221,7 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 10000L)
@Test(timeout = 60_000L)
public void testRestore() throws Exception
{
final File directory = tempFolder.newFolder();

View File

@ -907,7 +907,7 @@ public class TaskLifecycleTest
EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
}
@Test(timeout = 4000L)
@Test(timeout = 60_000L)
public void testRealtimeIndexTaskFailure() throws Exception
{
setUpAndStartTaskQueue(

View File

@ -202,7 +202,7 @@ public class SimpleResourceManagementStrategyTest
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(emitter);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);

View File

@ -20,6 +20,7 @@
package io.druid.curator.announcement;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import org.apache.curator.framework.CuratorFramework;
@ -172,6 +173,14 @@ public class AnnouncerTest extends CuratorTestBase
announcer.stop();
int count = 0;
while ((curator.checkExists().forPath(testPath1) != null) || (curator.checkExists().forPath(testPath1) != null)) {
Thread.sleep(100);
if (++count > 10) {
throw new ISE("Curator paths not getting cleaned up");
}
}
Assert.assertNull(curator.checkExists().forPath(testPath1));
Assert.assertNull(curator.checkExists().forPath(testPath2));
}