mirror of
https://github.com/apache/druid.git
synced 2025-02-09 11:34:54 +00:00
Fix AnnouncerTest.testSanity() (#5077)
* Fix announcer test * rename * split server module test * add name * polling until /test1 is created * add comment
This commit is contained in:
parent
645de02fb2
commit
322b7f6a27
21
.travis.yml
21
.travis.yml
@ -13,29 +13,46 @@ matrix:
|
|||||||
include:
|
include:
|
||||||
# strict compilation
|
# strict compilation
|
||||||
- sudo: false
|
- sudo: false
|
||||||
|
env:
|
||||||
|
- NAME="strict compilation"
|
||||||
install: true
|
install: true
|
||||||
# Strict compilation requires more than 2 GB
|
# Strict compilation requires more than 2 GB
|
||||||
script: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B
|
script: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn clean -Pstrict -pl '!benchmarks' compile test-compile -B
|
||||||
|
|
||||||
# processing module test
|
# processing module test
|
||||||
- sudo: false
|
- sudo: false
|
||||||
|
env:
|
||||||
|
- NAME="processing module test"
|
||||||
install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
|
install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
|
||||||
before_script:
|
before_script:
|
||||||
- unset _JAVA_OPTIONS
|
- unset _JAVA_OPTIONS
|
||||||
script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -pl processing
|
script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -pl processing
|
||||||
|
|
||||||
# non-processing modules test
|
# server module test
|
||||||
- sudo: false
|
- sudo: false
|
||||||
|
env:
|
||||||
|
- NAME="server module test"
|
||||||
install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
|
install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
|
||||||
before_script:
|
before_script:
|
||||||
- unset _JAVA_OPTIONS
|
- unset _JAVA_OPTIONS
|
||||||
script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -pl '!processing'
|
# Server module test is run without the parallel-test option because it's memory sensitive and often fails with that option.
|
||||||
|
script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -pl server
|
||||||
|
|
||||||
|
# other modules test
|
||||||
|
- sudo: false
|
||||||
|
env:
|
||||||
|
- NAME="other modules test"
|
||||||
|
install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
|
||||||
|
before_script:
|
||||||
|
- unset _JAVA_OPTIONS
|
||||||
|
script: echo "MAVEN_OPTS='-Xmx512m'" > ~/.mavenrc && mvn test -B -Pparallel-test -Dmaven.fork.count=2 -pl '!processing,!server'
|
||||||
|
|
||||||
# run integration tests
|
# run integration tests
|
||||||
- sudo: required
|
- sudo: required
|
||||||
services:
|
services:
|
||||||
- docker
|
- docker
|
||||||
env:
|
env:
|
||||||
|
- NAME="integration test"
|
||||||
- DOCKER_IP=172.17.0.1
|
- DOCKER_IP=172.17.0.1
|
||||||
install:
|
install:
|
||||||
# Only errors will be shown with the -q option. This is to avoid generating too many logs which make travis build failed.
|
# Only errors will be shown with the -q option. This is to avoid generating too many logs which make travis build failed.
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
package io.druid.curator.announcement;
|
package io.druid.curator.announcement;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
@ -43,6 +44,7 @@ import org.apache.zookeeper.KeeperException;
|
|||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -69,6 +71,9 @@ public class Announcer
|
|||||||
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new ConcurrentHashMap<>();
|
||||||
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
|
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
|
||||||
|
|
||||||
|
// Used for testing
|
||||||
|
private Set<String> addedChildren;
|
||||||
|
|
||||||
private boolean started = false;
|
private boolean started = false;
|
||||||
|
|
||||||
public Announcer(
|
public Announcer(
|
||||||
@ -86,9 +91,22 @@ public class Announcer
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void initializeAddedChildren()
|
||||||
|
{
|
||||||
|
addedChildren = new HashSet<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Set<String> getAddedChildren()
|
||||||
|
{
|
||||||
|
return addedChildren;
|
||||||
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
public void start()
|
public void start()
|
||||||
{
|
{
|
||||||
|
log.info("Starting announcer");
|
||||||
synchronized (toAnnounce) {
|
synchronized (toAnnounce) {
|
||||||
if (started) {
|
if (started) {
|
||||||
return;
|
return;
|
||||||
@ -111,6 +129,7 @@ public class Announcer
|
|||||||
@LifecycleStop
|
@LifecycleStop
|
||||||
public void stop()
|
public void stop()
|
||||||
{
|
{
|
||||||
|
log.info("Stopping announcer");
|
||||||
synchronized (toAnnounce) {
|
synchronized (toAnnounce) {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
return;
|
return;
|
||||||
@ -217,6 +236,8 @@ public class Announcer
|
|||||||
@Override
|
@Override
|
||||||
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
||||||
{
|
{
|
||||||
|
// NOTE: ZooKeeper does not guarantee that we will get every event, and thus PathChildrenCache doesn't
|
||||||
|
// as well. If one of the below events are missed, Announcer might not work properly.
|
||||||
log.debug("Path[%s] got event[%s]", parentPath, event);
|
log.debug("Path[%s] got event[%s]", parentPath, event);
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case CHILD_REMOVED:
|
case CHILD_REMOVED:
|
||||||
@ -259,8 +280,12 @@ public class Announcer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case INITIALIZED:
|
|
||||||
case CHILD_ADDED:
|
case CHILD_ADDED:
|
||||||
|
if (addedChildren != null) {
|
||||||
|
addedChildren.add(event.getData().getPath());
|
||||||
|
}
|
||||||
|
// fall through
|
||||||
|
case INITIALIZED:
|
||||||
case CHILD_UPDATED:
|
case CHILD_UPDATED:
|
||||||
case CONNECTION_SUSPENDED:
|
case CONNECTION_SUSPENDED:
|
||||||
// do nothing
|
// do nothing
|
||||||
|
@ -20,21 +20,26 @@
|
|||||||
package io.druid.curator.announcement;
|
package io.druid.curator.announcement;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import io.druid.java.util.common.concurrent.Execs;
|
|
||||||
import io.druid.curator.CuratorTestBase;
|
import io.druid.curator.CuratorTestBase;
|
||||||
import io.druid.java.util.common.StringUtils;
|
import io.druid.java.util.common.StringUtils;
|
||||||
|
import io.druid.java.util.common.concurrent.Execs;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.api.CuratorEvent;
|
import org.apache.curator.framework.api.CuratorEvent;
|
||||||
import org.apache.curator.framework.api.CuratorEventType;
|
import org.apache.curator.framework.api.CuratorEventType;
|
||||||
import org.apache.curator.framework.api.CuratorListener;
|
import org.apache.curator.framework.api.CuratorListener;
|
||||||
|
import org.apache.curator.framework.api.transaction.CuratorOp;
|
||||||
|
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
|
||||||
import org.apache.curator.test.KillSession;
|
import org.apache.curator.test.KillSession;
|
||||||
import org.apache.curator.utils.ZKPaths;
|
import org.apache.curator.utils.ZKPaths;
|
||||||
|
import org.apache.zookeeper.KeeperException.Code;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@ -43,7 +48,7 @@ import java.util.concurrent.ExecutorService;
|
|||||||
*/
|
*/
|
||||||
public class AnnouncerTest extends CuratorTestBase
|
public class AnnouncerTest extends CuratorTestBase
|
||||||
{
|
{
|
||||||
|
private static final Logger log = new Logger(AnnouncerTest.class);
|
||||||
private ExecutorService exec;
|
private ExecutorService exec;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@ -65,6 +70,7 @@ public class AnnouncerTest extends CuratorTestBase
|
|||||||
curator.start();
|
curator.start();
|
||||||
curator.blockUntilConnected();
|
curator.blockUntilConnected();
|
||||||
Announcer announcer = new Announcer(curator, exec);
|
Announcer announcer = new Announcer(curator, exec);
|
||||||
|
announcer.initializeAddedChildren();
|
||||||
|
|
||||||
final byte[] billy = StringUtils.toUtf8("billy");
|
final byte[] billy = StringUtils.toUtf8("billy");
|
||||||
final String testPath1 = "/test1";
|
final String testPath1 = "/test1";
|
||||||
@ -75,6 +81,9 @@ public class AnnouncerTest extends CuratorTestBase
|
|||||||
Assert.assertNull("/somewhere/test2 does not exists", curator.checkExists().forPath(testPath2));
|
Assert.assertNull("/somewhere/test2 does not exists", curator.checkExists().forPath(testPath2));
|
||||||
|
|
||||||
announcer.start();
|
announcer.start();
|
||||||
|
while (!announcer.getAddedChildren().contains("/test1")) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Assert.assertArrayEquals("/test1 has data", billy, curator.getData().decompressed().forPath(testPath1));
|
Assert.assertArrayEquals("/test1 has data", billy, curator.getData().decompressed().forPath(testPath1));
|
||||||
@ -83,7 +92,11 @@ public class AnnouncerTest extends CuratorTestBase
|
|||||||
announcer.announce(testPath2, billy);
|
announcer.announce(testPath2, billy);
|
||||||
|
|
||||||
Assert.assertArrayEquals("/test1 still has data", billy, curator.getData().decompressed().forPath(testPath1));
|
Assert.assertArrayEquals("/test1 still has data", billy, curator.getData().decompressed().forPath(testPath1));
|
||||||
Assert.assertArrayEquals("/somewhere/test2 has data", billy, curator.getData().decompressed().forPath(testPath2));
|
Assert.assertArrayEquals(
|
||||||
|
"/somewhere/test2 has data",
|
||||||
|
billy,
|
||||||
|
curator.getData().decompressed().forPath(testPath2)
|
||||||
|
);
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
curator.getCuratorListenable().addListener(
|
curator.getCuratorListenable().addListener(
|
||||||
@ -98,7 +111,12 @@ public class AnnouncerTest extends CuratorTestBase
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
curator.inTransaction().delete().forPath(testPath1).and().commit();
|
final CuratorOp deleteOp = curator.transactionOp().delete().forPath(testPath1);
|
||||||
|
final Collection<CuratorTransactionResult> results = curator.transaction().forOperations(deleteOp);
|
||||||
|
Assert.assertEquals(1, results.size());
|
||||||
|
final CuratorTransactionResult result = results.iterator().next();
|
||||||
|
Assert.assertEquals(Code.OK.intValue(), result.getError()); // assert delete
|
||||||
|
|
||||||
Assert.assertTrue("Wait for /test1 to be created", timing.forWaiting().awaitLatch(latch));
|
Assert.assertTrue("Wait for /test1 to be created", timing.forWaiting().awaitLatch(latch));
|
||||||
|
|
||||||
Assert.assertArrayEquals(
|
Assert.assertArrayEquals(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user