fix test to not rely on execution of processed / ack order

also, make sure the order is consistent (as much as possible) when calling ack to processed
This commit is contained in:
Shay Banon 2013-11-02 00:03:55 +01:00
parent 7d2c4afa98
commit acc5f584d4
2 changed files with 36 additions and 40 deletions

View File

@ -311,12 +311,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (previousClusterState == newClusterState) { if (previousClusterState == newClusterState) {
logger.debug("processing [{}]: no change in cluster_state", source); logger.debug("processing [{}]: no change in cluster_state", source);
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
if (updateTask instanceof AckedClusterStateUpdateTask) { if (updateTask instanceof AckedClusterStateUpdateTask) {
//no need to wait for ack if nothing changed, the update can be counted as acknowledged //no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask)updateTask).onAllNodesAcked(null); ((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
}
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
} }
return; return;
} }
@ -341,7 +341,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
} else { } else {
try { try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState, threadPool); ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState, threadPool);
} catch(EsRejectedExecutionException ex) { } catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex); logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
} }
@ -436,7 +436,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (newClusterState.nodes().localNodeMaster()) { if (newClusterState.nodes().localNodeMaster()) {
try { try {
ackListener.onNodeAck(localNode(), null); ackListener.onNodeAck(localNode(), null);
} catch(Throwable t) { } catch (Throwable t) {
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode()); logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
} }
} }

View File

@ -50,16 +50,13 @@ import static org.hamcrest.Matchers.*;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.TEST, numNodes=0) @ClusterScope(scope = Scope.TEST, numNodes = 0)
public class ClusterServiceTests extends AbstractIntegrationTest { public class ClusterServiceTests extends AbstractIntegrationTest {
@Test @Test
public void testTimeoutUpdateTask() throws Exception { public void testTimeoutUpdateTask() throws Exception {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "zen") .put("discovery.type", "local")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build(); .build();
cluster().startNode(settings); cluster().startNode(settings);
ClusterService clusterService1 = cluster().getInstance(ClusterService.class); ClusterService clusterService1 = cluster().getInstance(ClusterService.class);
@ -114,10 +111,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Test @Test
public void testAckedUpdateTask() throws Exception { public void testAckedUpdateTask() throws Exception {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "zen") .put("discovery.type", "local")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build(); .build();
cluster().startNode(settings); cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class); ClusterService clusterService = cluster().getInstance(ClusterService.class);
@ -126,8 +120,8 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
final AtomicBoolean ackTimeout = new AtomicBoolean(false); final AtomicBoolean ackTimeout = new AtomicBoolean(false);
final AtomicBoolean onFailure = new AtomicBoolean(false); final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false); final AtomicBoolean executed = new AtomicBoolean(false);
final AtomicBoolean processed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override @Override
public boolean mustAck(DiscoveryNode discoveryNode) { public boolean mustAck(DiscoveryNode discoveryNode) {
@ -137,11 +131,13 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void onAllNodesAcked(@Nullable Throwable t) { public void onAllNodesAcked(@Nullable Throwable t) {
allNodesAcked.set(true); allNodesAcked.set(true);
latch.countDown();
} }
@Override @Override
public void onAckTimeout() { public void onAckTimeout() {
ackTimeout.set(true); ackTimeout.set(true);
latch.countDown();
} }
@Override @Override
@ -156,8 +152,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processed.set(true); processedLatch.countDown();
latch.countDown();
} }
@Override @Override
@ -168,6 +163,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
logger.error("failed to execute callback in test {}", t, source);
onFailure.set(true); onFailure.set(true);
latch.countDown(); latch.countDown();
} }
@ -178,17 +174,15 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
assertThat(allNodesAcked.get(), equalTo(true)); assertThat(allNodesAcked.get(), equalTo(true));
assertThat(ackTimeout.get(), equalTo(false)); assertThat(ackTimeout.get(), equalTo(false));
assertThat(executed.get(), equalTo(true)); assertThat(executed.get(), equalTo(true));
assertThat(processed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false)); assertThat(onFailure.get(), equalTo(false));
assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
} }
@Test @Test
public void testAckedUpdateTaskSameClusterState() throws Exception { public void testAckedUpdateTaskSameClusterState() throws Exception {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "zen") .put("discovery.type", "local")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build(); .build();
cluster().startNode(settings); cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class); ClusterService clusterService = cluster().getInstance(ClusterService.class);
@ -197,8 +191,8 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
final AtomicBoolean ackTimeout = new AtomicBoolean(false); final AtomicBoolean ackTimeout = new AtomicBoolean(false);
final AtomicBoolean onFailure = new AtomicBoolean(false); final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false); final AtomicBoolean executed = new AtomicBoolean(false);
final AtomicBoolean processed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override @Override
public boolean mustAck(DiscoveryNode discoveryNode) { public boolean mustAck(DiscoveryNode discoveryNode) {
@ -208,11 +202,13 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void onAllNodesAcked(@Nullable Throwable t) { public void onAllNodesAcked(@Nullable Throwable t) {
allNodesAcked.set(true); allNodesAcked.set(true);
latch.countDown();
} }
@Override @Override
public void onAckTimeout() { public void onAckTimeout() {
ackTimeout.set(true); ackTimeout.set(true);
latch.countDown();
} }
@Override @Override
@ -227,8 +223,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processed.set(true); processedLatch.countDown();
latch.countDown();
} }
@Override @Override
@ -239,6 +234,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
logger.error("failed to execute callback in test {}", t, source);
onFailure.set(true); onFailure.set(true);
latch.countDown(); latch.countDown();
} }
@ -249,17 +245,15 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
assertThat(allNodesAcked.get(), equalTo(true)); assertThat(allNodesAcked.get(), equalTo(true));
assertThat(ackTimeout.get(), equalTo(false)); assertThat(ackTimeout.get(), equalTo(false));
assertThat(executed.get(), equalTo(true)); assertThat(executed.get(), equalTo(true));
assertThat(processed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false)); assertThat(onFailure.get(), equalTo(false));
assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
} }
@Test @Test
public void testAckedUpdateTaskNoAckExpected() throws Exception { public void testAckedUpdateTaskNoAckExpected() throws Exception {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "zen") .put("discovery.type", "local")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build(); .build();
cluster().startNode(settings); cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class); ClusterService clusterService = cluster().getInstance(ClusterService.class);
@ -309,6 +303,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
logger.error("failed to execute callback in test {}", t, source);
onFailure.set(true); onFailure.set(true);
latch.countDown(); latch.countDown();
} }
@ -325,10 +320,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Test @Test
public void testAckedUpdateTaskTimeoutZero() throws Exception { public void testAckedUpdateTaskTimeoutZero() throws Exception {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "zen") .put("discovery.type", "local")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build(); .build();
cluster().startNode(settings); cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class); ClusterService clusterService = cluster().getInstance(ClusterService.class);
@ -338,6 +330,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
final AtomicBoolean onFailure = new AtomicBoolean(false); final AtomicBoolean onFailure = new AtomicBoolean(false);
final AtomicBoolean executed = new AtomicBoolean(false); final AtomicBoolean executed = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch processedLatch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask() {
@Override @Override
public boolean mustAck(DiscoveryNode discoveryNode) { public boolean mustAck(DiscoveryNode discoveryNode) {
@ -368,7 +361,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processedLatch.countDown();
} }
@Override @Override
@ -379,6 +372,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
logger.error("failed to execute callback in test {}", t, source);
onFailure.set(true); onFailure.set(true);
latch.countDown(); latch.countDown();
} }
@ -390,6 +384,8 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
assertThat(ackTimeout.get(), equalTo(true)); assertThat(ackTimeout.get(), equalTo(true));
assertThat(executed.get(), equalTo(true)); assertThat(executed.get(), equalTo(true));
assertThat(onFailure.get(), equalTo(false)); assertThat(onFailure.get(), equalTo(false));
assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
} }
@Test @Test
@ -538,8 +534,8 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
assertThat(testService1.master(), is(true)); assertThat(testService1.master(), is(true));
String node_1 = cluster().startNode(settings); String node_1 = cluster().startNode(settings);
ClusterService clusterService2 = cluster().getInstance(ClusterService.class, node_1); ClusterService clusterService2 = cluster().getInstance(ClusterService.class, node_1);
MasterAwareService testService2 = cluster().getInstance(MasterAwareService.class, node_1); MasterAwareService testService2 = cluster().getInstance(MasterAwareService.class, node_1);
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.isTimedOut(), equalTo(false));
@ -569,7 +565,7 @@ public class ClusterServiceTests extends AbstractIntegrationTest {
String node_2 = cluster().startNode(settings); String node_2 = cluster().startNode(settings);
clusterService1 =cluster().getInstance(ClusterService.class, node_2); clusterService1 = cluster().getInstance(ClusterService.class, node_2);
testService1 = cluster().getInstance(MasterAwareService.class, node_2); testService1 = cluster().getInstance(MasterAwareService.class, node_2);
// make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive the updated cluster state... // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive the updated cluster state...