mirror of https://github.com/apache/lucene.git
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/lucene-solr
This commit is contained in:
commit
ae6d29f0ae
|
@ -14,6 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import http.client
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
|
@ -131,6 +131,8 @@ Other Changes
|
||||||
|
|
||||||
* SOLR-12118: Solr Ref-Guide can now use some ivy version props directly as attributes in content (hossman)
|
* SOLR-12118: Solr Ref-Guide can now use some ivy version props directly as attributes in content (hossman)
|
||||||
|
|
||||||
|
* SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin)
|
||||||
|
|
||||||
================== 7.3.0 ==================
|
================== 7.3.0 ==================
|
||||||
|
|
||||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class CorePropertiesLocator implements CoresLocator {
|
||||||
if (Files.exists(propertiesFile))
|
if (Files.exists(propertiesFile))
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||||
"Could not create a new core in " + cd.getInstanceDir()
|
"Could not create a new core in " + cd.getInstanceDir()
|
||||||
+ "as another core is already defined there");
|
+ " as another core is already defined there");
|
||||||
writePropertiesFile(cd, propertiesFile);
|
writePropertiesFile(cd, propertiesFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,6 +224,7 @@ public class DocValuesNotIndexedTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
// We should be able to sort thing with missing first/last and that are _NOT_ present at all on one server.
|
// We should be able to sort thing with missing first/last and that are _NOT_ present at all on one server.
|
||||||
@Test
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testGroupingSorting() throws IOException, SolrServerException {
|
public void testGroupingSorting() throws IOException, SolrServerException {
|
||||||
CloudSolrClient client = cluster.getSolrClient();
|
CloudSolrClient client = cluster.getSolrClient();
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ public class RestartWhileUpdatingTest extends AbstractFullDistribZkTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void test() throws Exception {
|
public void test() throws Exception {
|
||||||
handle.clear();
|
handle.clear();
|
||||||
handle.put("timestamp", SKIPVAL);
|
handle.put("timestamp", SKIPVAL);
|
||||||
|
|
|
@ -88,11 +88,13 @@ public class TestCloudConsistency extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception {
|
public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception {
|
||||||
testOutOfSyncReplicasCannotBecomeLeader(false);
|
testOutOfSyncReplicasCannotBecomeLeader(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception {
|
public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception {
|
||||||
testOutOfSyncReplicasCannotBecomeLeader(true);
|
testOutOfSyncReplicasCannotBecomeLeader(true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -301,6 +302,8 @@ public class TestPullReplica extends SolrCloudTestCase {
|
||||||
doTestNoLeader(true);
|
doTestNoLeader(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testKillLeader() throws Exception {
|
public void testKillLeader() throws Exception {
|
||||||
doTestNoLeader(false);
|
doTestNoLeader(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,6 +122,8 @@ public class TestSegmentSorting extends SolrCloudTestCase {
|
||||||
* In this situation, the updates should *NOT* be done inplace, because that would
|
* In this situation, the updates should *NOT* be done inplace, because that would
|
||||||
* break the index sorting
|
* break the index sorting
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testAtomicUpdateOfSegmentSortField() throws Exception {
|
public void testAtomicUpdateOfSegmentSortField() throws Exception {
|
||||||
|
|
||||||
final CloudSolrClient cloudSolrClient = cluster.getSolrClient();
|
final CloudSolrClient cloudSolrClient = cluster.getSolrClient();
|
||||||
|
|
|
@ -483,6 +483,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testCollectionReload() throws Exception {
|
public void testCollectionReload() throws Exception {
|
||||||
|
|
||||||
final String collectionName = "reloaded_collection";
|
final String collectionName = "reloaded_collection";
|
||||||
|
|
|
@ -0,0 +1,242 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.metrics.SolrCoreMetricManager;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test for {@link MetricTrigger}
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
public class MetricTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||||
|
static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||||
|
private static CountDownLatch triggerFiredLatch;
|
||||||
|
private static int waitForSeconds = 1;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricTrigger() throws Exception {
|
||||||
|
cluster.waitForAllNodes(5);
|
||||||
|
|
||||||
|
String collectionName = "testMetricTrigger";
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||||
|
"conf", 2, 2).setMaxShardsPerNode(2);
|
||||||
|
create.process(solrClient);
|
||||||
|
solrClient.setDefaultCollection(collectionName);
|
||||||
|
|
||||||
|
waitForState("Timed out waiting for collection:" + collectionName + " to become active", collectionName, clusterShape(2, 2));
|
||||||
|
|
||||||
|
DocCollection docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||||
|
String shardId = "shard1";
|
||||||
|
Replica replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
|
||||||
|
String coreName = replica.getCoreName();
|
||||||
|
String replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
|
||||||
|
long waitForSeconds = 2 + random().nextInt(5);
|
||||||
|
String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
|
||||||
|
String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
|
||||||
|
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'metric_trigger'," +
|
||||||
|
"'event' : 'metric'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'metric': '" + tag + "'" +
|
||||||
|
"'above' : 100.0," +
|
||||||
|
"'collection': '" + collectionName + "'" +
|
||||||
|
"'shard':'" + shardId + "'" +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
|
||||||
|
"]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
String setListenerCommand1 = "{" +
|
||||||
|
"'set-listener' : " +
|
||||||
|
"{" +
|
||||||
|
"'name' : 'srt'," +
|
||||||
|
"'trigger' : 'metric_trigger'," +
|
||||||
|
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||||
|
"'afterAction': ['compute', 'execute', 'test']," +
|
||||||
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
|
"}" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// start more nodes so that we have at least 4
|
||||||
|
for (int i = cluster.getJettySolrRunners().size(); i < 4; i++) {
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
}
|
||||||
|
cluster.waitForAllNodes(10);
|
||||||
|
|
||||||
|
List<SolrInputDocument> docs = new ArrayList<>(500);
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
docs.add(new SolrInputDocument("id", String.valueOf(i), "x_s", "x" + i));
|
||||||
|
}
|
||||||
|
solrClient.add(docs);
|
||||||
|
solrClient.commit();
|
||||||
|
|
||||||
|
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
// wait for listener to capture the SUCCEEDED stage
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
|
||||||
|
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||||
|
long now = timeSource.getTimeNs();
|
||||||
|
// verify waitFor
|
||||||
|
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||||
|
assertEquals(collectionName, ev.event.getProperties().get("collection"));
|
||||||
|
|
||||||
|
// find a new replica and create its metric name
|
||||||
|
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||||
|
replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
|
||||||
|
coreName = replica.getCoreName();
|
||||||
|
replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
|
||||||
|
registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
|
||||||
|
tag = "metrics:" + registry + ":INDEX.sizeInBytes";
|
||||||
|
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
listenerEvents.clear();
|
||||||
|
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'metric_trigger'," +
|
||||||
|
"'event' : 'metric'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'metric': '" + tag + "'" +
|
||||||
|
"'above' : 100.0," +
|
||||||
|
"'collection': '" + collectionName + "'" +
|
||||||
|
"'shard':'" + shardId + "'" +
|
||||||
|
"'preferredOperation':'addreplica'" +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
|
||||||
|
"]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
// wait for listener to capture the SUCCEEDED stage
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
|
||||||
|
ev = listenerEvents.get("srt").get(0);
|
||||||
|
now = timeSource.getTimeNs();
|
||||||
|
// verify waitFor
|
||||||
|
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||||
|
assertEquals(collectionName, ev.event.getProperties().get("collection"));
|
||||||
|
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||||
|
assertEquals(5, docCollection.getReplicas().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MetricAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||||
|
try {
|
||||||
|
events.add(event);
|
||||||
|
long currentTimeNanos = timeSource.getTimeNs();
|
||||||
|
long eventTimeNanos = event.getEventTime();
|
||||||
|
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||||
|
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||||
|
fail(event.source + " was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerListener extends TriggerListenerBase {
|
||||||
|
@Override
|
||||||
|
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||||
|
super.init(cloudManager, config);
|
||||||
|
listenerCreated.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||||
|
ActionContext context, Throwable error, String message) {
|
||||||
|
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||||
|
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,300 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.Overseer;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.apache.solr.util.TimeOut;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
|
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||||
|
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch actionConstructorCalled;
|
||||||
|
private static CountDownLatch actionInitCalled;
|
||||||
|
private static CountDownLatch triggerFiredLatch;
|
||||||
|
private static int waitForSeconds = 1;
|
||||||
|
private static AtomicBoolean triggerFired;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
private static SolrCloudManager cloudManager;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CountDownLatch getTriggerFiredLatch() {
|
||||||
|
return triggerFiredLatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupTest() throws Exception {
|
||||||
|
// ensure that exactly 2 jetty nodes are running
|
||||||
|
int numJetties = cluster.getJettySolrRunners().size();
|
||||||
|
log.info("Found {} jetty instances running", numJetties);
|
||||||
|
for (int i = 2; i < numJetties; i++) {
|
||||||
|
int r = random().nextInt(cluster.getJettySolrRunners().size());
|
||||||
|
log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
|
||||||
|
cluster.stopJettySolrRunner(r);
|
||||||
|
}
|
||||||
|
for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
|
||||||
|
// start jetty instances
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
}
|
||||||
|
cluster.waitForAllNodes(5);
|
||||||
|
|
||||||
|
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||||
|
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||||
|
int overseerLeaderIndex = 0;
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||||
|
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||||
|
overseerLeaderIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
|
||||||
|
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||||
|
// aggressively remove all active scheduled triggers
|
||||||
|
scheduledTriggers.removeAll();
|
||||||
|
|
||||||
|
// clear any persisted auto scaling configuration
|
||||||
|
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||||
|
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||||
|
|
||||||
|
cluster.deleteAllCollections();
|
||||||
|
cluster.getSolrClient().setDefaultCollection(null);
|
||||||
|
|
||||||
|
// restart Overseer. Even though we reset the autoscaling config some already running
|
||||||
|
// trigger threads may still continue to execute and produce spurious events
|
||||||
|
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
waitForSeconds = 1 + random().nextInt(3);
|
||||||
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
triggerFired = new AtomicBoolean(false);
|
||||||
|
events.clear();
|
||||||
|
|
||||||
|
while (cluster.getJettySolrRunners().size() < 2) {
|
||||||
|
// perhaps a test stopped a node but didn't start it back
|
||||||
|
// lets start a node
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
}
|
||||||
|
|
||||||
|
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||||
|
// clear any events or markers
|
||||||
|
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteChildrenRecursively(String path) throws Exception {
|
||||||
|
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeAddedTriggerRestoreState() throws Exception {
|
||||||
|
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||||
|
actionInitCalled = new CountDownLatch(2);
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
waitForSeconds = 5;
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_restore_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
|
||||||
|
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
||||||
|
|
||||||
|
// start a new node
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
|
||||||
|
// ensure that the old trigger sees the new node, todo find a better way to do this
|
||||||
|
Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
||||||
|
|
||||||
|
waitForSeconds = 0;
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_restore_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// wait until the second instance of action is created
|
||||||
|
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("Two TriggerAction instances should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
||||||
|
assertNotNull(nodeAddedEvent);
|
||||||
|
List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
|
assertTrue(nodeNames.contains(newNode.getNodeName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeAddedTrigger() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("The TriggerAction should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
||||||
|
assertNotNull(nodeAddedEvent);
|
||||||
|
List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
|
assertTrue(nodeNames.contains(newNode.getNodeName()));
|
||||||
|
|
||||||
|
// reset
|
||||||
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// update the trigger with exactly the same data
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// this should be a no-op so the action should have been created but init should not be called
|
||||||
|
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("The TriggerAction should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
public TestTriggerAction() {
|
||||||
|
actionConstructorCalled.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
try {
|
||||||
|
if (triggerFired.compareAndSet(false, true)) {
|
||||||
|
events.add(event);
|
||||||
|
long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
|
||||||
|
long eventTimeNanos = event.getEventTime();
|
||||||
|
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||||
|
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||||
|
fail(event.source + " was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
getTriggerFiredLatch().countDown();
|
||||||
|
} else {
|
||||||
|
fail(event.source + " was fired more than once!");
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
log.info("TestTriggerAction init");
|
||||||
|
actionInitCalled.countDown();
|
||||||
|
super.init(args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,322 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.Overseer;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.apache.solr.util.TimeOut;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
|
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||||
|
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch actionConstructorCalled;
|
||||||
|
private static CountDownLatch actionInitCalled;
|
||||||
|
private static CountDownLatch triggerFiredLatch;
|
||||||
|
private static int waitForSeconds = 1;
|
||||||
|
private static AtomicBoolean triggerFired;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
private static SolrCloudManager cloudManager;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CountDownLatch getTriggerFiredLatch() {
|
||||||
|
return triggerFiredLatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupTest() throws Exception {
|
||||||
|
// ensure that exactly 2 jetty nodes are running
|
||||||
|
int numJetties = cluster.getJettySolrRunners().size();
|
||||||
|
log.info("Found {} jetty instances running", numJetties);
|
||||||
|
for (int i = 2; i < numJetties; i++) {
|
||||||
|
int r = random().nextInt(cluster.getJettySolrRunners().size());
|
||||||
|
log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
|
||||||
|
cluster.stopJettySolrRunner(r);
|
||||||
|
}
|
||||||
|
for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
|
||||||
|
// start jetty instances
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
}
|
||||||
|
cluster.waitForAllNodes(5);
|
||||||
|
|
||||||
|
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||||
|
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||||
|
int overseerLeaderIndex = 0;
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||||
|
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||||
|
overseerLeaderIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
|
||||||
|
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||||
|
// aggressively remove all active scheduled triggers
|
||||||
|
scheduledTriggers.removeAll();
|
||||||
|
|
||||||
|
// clear any persisted auto scaling configuration
|
||||||
|
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||||
|
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||||
|
|
||||||
|
cluster.deleteAllCollections();
|
||||||
|
cluster.getSolrClient().setDefaultCollection(null);
|
||||||
|
|
||||||
|
// restart Overseer. Even though we reset the autoscaling config some already running
|
||||||
|
// trigger threads may still continue to execute and produce spurious events
|
||||||
|
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
waitForSeconds = 1 + random().nextInt(3);
|
||||||
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
triggerFired = new AtomicBoolean(false);
|
||||||
|
events.clear();
|
||||||
|
|
||||||
|
while (cluster.getJettySolrRunners().size() < 2) {
|
||||||
|
// perhaps a test stopped a node but didn't start it back
|
||||||
|
// lets start a node
|
||||||
|
cluster.startJettySolrRunner();
|
||||||
|
}
|
||||||
|
|
||||||
|
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||||
|
// clear any events or markers
|
||||||
|
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||||
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteChildrenRecursively(String path) throws Exception {
|
||||||
|
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeLostTriggerRestoreState() throws Exception {
|
||||||
|
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||||
|
actionInitCalled = new CountDownLatch(2);
|
||||||
|
|
||||||
|
// start a new node
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
String nodeName = newNode.getNodeName();
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
waitForSeconds = 5;
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_restore_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
|
||||||
|
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
||||||
|
|
||||||
|
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
||||||
|
int index = -1;
|
||||||
|
for (int i = 0; i < jettySolrRunners.size(); i++) {
|
||||||
|
JettySolrRunner runner = jettySolrRunners.get(i);
|
||||||
|
if (runner == newNode) index = i;
|
||||||
|
}
|
||||||
|
assertFalse(index == -1);
|
||||||
|
cluster.stopJettySolrRunner(index);
|
||||||
|
|
||||||
|
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
||||||
|
Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
||||||
|
|
||||||
|
waitForSeconds = 0;
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_restore_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// wait until the second instance of action is created
|
||||||
|
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("Two TriggerAction instances should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
|
||||||
|
assertNotNull(nodeLostEvent);
|
||||||
|
List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
|
assertTrue(nodeNames.contains(nodeName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeLostTrigger() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||||
|
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||||
|
int nonOverseerLeaderIndex = 0;
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||||
|
if (!jetty.getNodeName().equals(overseerLeader)) {
|
||||||
|
nonOverseerLeaderIndex = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("The TriggerAction should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
triggerFired.set(false);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
|
||||||
|
cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
|
||||||
|
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
|
||||||
|
assertNotNull(nodeLostEvent);
|
||||||
|
List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
|
assertTrue(nodeNames.contains(lostNodeName));
|
||||||
|
|
||||||
|
// reset
|
||||||
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// update the trigger with exactly the same data
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// this should be a no-op so the action should have been created but init should not be called
|
||||||
|
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("The TriggerAction should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
public TestTriggerAction() {
|
||||||
|
actionConstructorCalled.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
try {
|
||||||
|
if (triggerFired.compareAndSet(false, true)) {
|
||||||
|
events.add(event);
|
||||||
|
long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
|
||||||
|
long eventTimeNanos = event.getEventTime();
|
||||||
|
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||||
|
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||||
|
fail(event.source + " was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
getTriggerFiredLatch().countDown();
|
||||||
|
} else {
|
||||||
|
fail(event.source + " was fired more than once!");
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
log.info("TestTriggerAction init");
|
||||||
|
actionInitCalled.countDown();
|
||||||
|
super.init(args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,269 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.cloud.LiveNodesListener;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
|
||||||
|
public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch actionInitCalled;
|
||||||
|
private static CountDownLatch triggerFiredLatch;
|
||||||
|
private static CountDownLatch actionConstructorCalled;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
private static ZkStateReader zkStateReader;
|
||||||
|
private static ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CountDownLatch getTriggerFiredLatch() {
|
||||||
|
return triggerFiredLatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeMarkersRegistration() throws Exception {
|
||||||
|
// for this test we want to create two triggers so we must assert that the actions were created twice
|
||||||
|
actionInitCalled = new CountDownLatch(2);
|
||||||
|
// similarly we want both triggers to fire
|
||||||
|
triggerFiredLatch = new CountDownLatch(2);
|
||||||
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
TestLiveNodesListener listener = registerLiveNodesListener();
|
||||||
|
|
||||||
|
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||||
|
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||||
|
int overseerLeaderIndex = 0;
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||||
|
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||||
|
overseerLeaderIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// add a node
|
||||||
|
JettySolrRunner node = cluster.startJettySolrRunner();
|
||||||
|
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("onChange listener didn't execute on cluster change");
|
||||||
|
}
|
||||||
|
assertEquals(1, listener.addedNodes.size());
|
||||||
|
assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
|
||||||
|
// verify that a znode doesn't exist (no trigger)
|
||||||
|
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
|
||||||
|
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
|
||||||
|
listener.reset();
|
||||||
|
// stop overseer
|
||||||
|
log.info("====== KILL OVERSEER 1");
|
||||||
|
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
|
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("onChange listener didn't execute on cluster change");
|
||||||
|
}
|
||||||
|
assertEquals(1, listener.lostNodes.size());
|
||||||
|
assertEquals(overseerLeader, listener.lostNodes.iterator().next());
|
||||||
|
assertEquals(0, listener.addedNodes.size());
|
||||||
|
// wait until the new overseer is up
|
||||||
|
Thread.sleep(5000);
|
||||||
|
// verify that a znode does NOT exist - there's no nodeLost trigger,
|
||||||
|
// so the new overseer cleaned up existing nodeLost markers
|
||||||
|
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
|
||||||
|
assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
|
||||||
|
|
||||||
|
listener.reset();
|
||||||
|
|
||||||
|
// set up triggers
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
|
||||||
|
log.info("====== ADD TRIGGERS");
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_triggerMR'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '1s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_triggerMR'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '1s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||||
|
overseerLeader = (String) overSeerStatus.get("leader");
|
||||||
|
overseerLeaderIndex = 0;
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||||
|
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||||
|
overseerLeaderIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create another node
|
||||||
|
log.info("====== ADD NODE 1");
|
||||||
|
JettySolrRunner node1 = cluster.startJettySolrRunner();
|
||||||
|
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("onChange listener didn't execute on cluster change");
|
||||||
|
}
|
||||||
|
assertEquals(1, listener.addedNodes.size());
|
||||||
|
assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
|
||||||
|
// verify that a znode exists
|
||||||
|
pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
|
||||||
|
assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
// nodeAdded marker should be consumed now by nodeAdded trigger
|
||||||
|
assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
|
||||||
|
|
||||||
|
listener.reset();
|
||||||
|
events.clear();
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
// kill overseer again
|
||||||
|
log.info("====== KILL OVERSEER 2");
|
||||||
|
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
|
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("onChange listener didn't execute on cluster change");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
|
||||||
|
fail("Trigger should have fired by now");
|
||||||
|
}
|
||||||
|
assertEquals(1, events.size());
|
||||||
|
TriggerEvent ev = events.iterator().next();
|
||||||
|
List<String> nodeNames = (List<String>) ev.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
|
assertTrue(nodeNames.contains(overseerLeader));
|
||||||
|
assertEquals(TriggerEventType.NODELOST, ev.getEventType());
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestLiveNodesListener registerLiveNodesListener() {
|
||||||
|
TestLiveNodesListener listener = new TestLiveNodesListener();
|
||||||
|
zkStateReader.registerLiveNodesListener(listener);
|
||||||
|
return listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestLiveNodesListener implements LiveNodesListener {
|
||||||
|
Set<String> lostNodes = new HashSet<>();
|
||||||
|
Set<String> addedNodes = new HashSet<>();
|
||||||
|
CountDownLatch onChangeLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public void reset() {
|
||||||
|
lostNodes.clear();
|
||||||
|
addedNodes.clear();
|
||||||
|
onChangeLatch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
|
||||||
|
onChangeLatch.countDown();
|
||||||
|
Set<String> old = new HashSet<>(oldLiveNodes);
|
||||||
|
old.removeAll(newLiveNodes);
|
||||||
|
if (!old.isEmpty()) {
|
||||||
|
lostNodes.addAll(old);
|
||||||
|
}
|
||||||
|
newLiveNodes.removeAll(oldLiveNodes);
|
||||||
|
if (!newLiveNodes.isEmpty()) {
|
||||||
|
addedNodes.addAll(newLiveNodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestEventMarkerAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
public TestEventMarkerAction() {
|
||||||
|
actionConstructorCalled.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
boolean locked = lock.tryLock();
|
||||||
|
if (!locked) {
|
||||||
|
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
events.add(event);
|
||||||
|
getTriggerFiredLatch().countDown();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
log.info("TestEventMarkerAction init");
|
||||||
|
actionInitCalled.countDown();
|
||||||
|
super.init(args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,169 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test to ensure that triggers can restore state from ZooKeeper after overseer restart
|
||||||
|
* so that events detected before restart are not lost.
|
||||||
|
*
|
||||||
|
* Added in SOLR-10515
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
public class RestoreTriggerStateTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch actionInitCalled;
|
||||||
|
private static CountDownLatch triggerFiredLatch;
|
||||||
|
private static AtomicBoolean triggerFired;
|
||||||
|
private static CountDownLatch actionConstructorCalled;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
private static int waitForSeconds = 1;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
triggerFired = new AtomicBoolean();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEventFromRestoredState() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_triggerEFRS'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '10s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
if (!actionInitCalled.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("The TriggerAction should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||||
|
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||||
|
int overseerLeaderIndex = 0;
|
||||||
|
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||||
|
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||||
|
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||||
|
overseerLeaderIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events.clear();
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
// reset
|
||||||
|
triggerFired.set(false);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
||||||
|
assertNotNull(nodeAddedEvent);
|
||||||
|
List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||||
|
assertTrue(nodeNames.contains(newNode.getNodeName()));
|
||||||
|
// add a second node - state of the trigger will change but it won't fire for waitFor sec.
|
||||||
|
JettySolrRunner newNode2 = cluster.startJettySolrRunner();
|
||||||
|
Thread.sleep(10000);
|
||||||
|
// kill overseer leader
|
||||||
|
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
|
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
public TestTriggerAction() {
|
||||||
|
actionConstructorCalled.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
try {
|
||||||
|
if (triggerFired.compareAndSet(false, true)) {
|
||||||
|
events.add(event);
|
||||||
|
long currentTimeNanos = timeSource.getTimeNs();
|
||||||
|
long eventTimeNanos = event.getEventTime();
|
||||||
|
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||||
|
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||||
|
fail(event.source + " was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
} else {
|
||||||
|
fail(event.source + " was fired more than once!");
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
log.info("TestTriggerAction init");
|
||||||
|
actionInitCalled.countDown();
|
||||||
|
super.init(args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test for {@link ScheduledTrigger}
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
@LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
|
public class ScheduledTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch triggerFiredLatch;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduledTrigger() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
|
||||||
|
// this collection will place 2 cores on 1st node and 1 core on 2nd node
|
||||||
|
String collectionName = "testScheduledTrigger";
|
||||||
|
CollectionAdminRequest.createCollection(collectionName, 1, 3)
|
||||||
|
.setMaxShardsPerNode(5).process(solrClient);
|
||||||
|
waitForState("", collectionName, clusterShape(1, 3));
|
||||||
|
|
||||||
|
// create a policy which allows only 1 core per node thereby creating a violation for the above collection
|
||||||
|
String setClusterPolicy = "{\n" +
|
||||||
|
" \"set-cluster-policy\" : [\n" +
|
||||||
|
" {\"cores\" : \"<2\", \"node\" : \"#EACH\"}\n" +
|
||||||
|
" ]\n" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// start a new node which can be used to balance the cluster as per policy
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
cluster.waitForAllNodes(10);
|
||||||
|
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'sched_trigger_integration1'," +
|
||||||
|
"'event' : 'scheduled'," +
|
||||||
|
"'startTime' : '" + new Date().toInstant().toString() + "'" +
|
||||||
|
"'every' : '+3SECONDS'" +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
|
||||||
|
"]}}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(20, TimeUnit.SECONDS));
|
||||||
|
assertEquals(1, events.size());
|
||||||
|
Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
|
||||||
|
assertNotNull(actionContextProps);
|
||||||
|
TriggerEvent event = events.iterator().next();
|
||||||
|
List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
|
||||||
|
assertNotNull(operations);
|
||||||
|
assertEquals(1, operations.size());
|
||||||
|
for (SolrRequest operation : operations) {
|
||||||
|
SolrParams params = operation.getParams();
|
||||||
|
assertEquals(newNode.getNodeName(), params.get("targetNode"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ContextPropertiesRecorderAction extends TriggerActionBase {
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
actionContextPropertiesRef.set(actionContext.getProperties());
|
||||||
|
try {
|
||||||
|
events.add(event);
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,217 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.AtomicDouble;
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test for {@link SearchRateTrigger}
|
||||||
|
*/
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
|
||||||
|
public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
private static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||||
|
private static int waitForSeconds = 1;
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(5)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSearchRate() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
String COLL1 = "collection1";
|
||||||
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||||
|
"conf", 1, 2);
|
||||||
|
create.process(solrClient);
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'search_rate_trigger'," +
|
||||||
|
"'event' : 'searchRate'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'rate' : 1.0," +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
|
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||||
|
"]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
String setListenerCommand1 = "{" +
|
||||||
|
"'set-listener' : " +
|
||||||
|
"{" +
|
||||||
|
"'name' : 'srt'," +
|
||||||
|
"'trigger' : 'search_rate_trigger'," +
|
||||||
|
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||||
|
"'afterAction': ['compute', 'execute', 'test']," +
|
||||||
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
|
"}" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
SolrParams query = params(CommonParams.Q, "*:*");
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
solrClient.query(COLL1, query);
|
||||||
|
}
|
||||||
|
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
// wait for listener to capture the SUCCEEDED stage
|
||||||
|
Thread.sleep(5000);
|
||||||
|
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||||
|
assertEquals(listenerEvents.toString(), 4, events.size());
|
||||||
|
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
|
||||||
|
assertEquals("compute", events.get(0).actionName);
|
||||||
|
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
|
||||||
|
assertEquals("execute", events.get(1).actionName);
|
||||||
|
assertEquals("AFTER_ACTION", events.get(2).stage.toString());
|
||||||
|
assertEquals("test", events.get(2).actionName);
|
||||||
|
assertEquals("SUCCEEDED", events.get(3).stage.toString());
|
||||||
|
assertNull(events.get(3).actionName);
|
||||||
|
|
||||||
|
CapturedEvent ev = events.get(0);
|
||||||
|
long now = timeSource.getTimeNs();
|
||||||
|
// verify waitFor
|
||||||
|
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||||
|
Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
|
||||||
|
assertNotNull("nodeRates", nodeRates);
|
||||||
|
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
|
||||||
|
AtomicDouble totalNodeRate = new AtomicDouble();
|
||||||
|
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
|
||||||
|
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
|
||||||
|
assertNotNull("replicaRates", replicaRates);
|
||||||
|
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
|
||||||
|
AtomicDouble totalReplicaRate = new AtomicDouble();
|
||||||
|
replicaRates.forEach(r -> {
|
||||||
|
assertTrue(r.toString(), r.getVariable("rate") != null);
|
||||||
|
totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
|
||||||
|
});
|
||||||
|
Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
|
||||||
|
assertNotNull("shardRates", shardRates);
|
||||||
|
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||||
|
shardRates = (Map<String, Object>) shardRates.get(COLL1);
|
||||||
|
assertNotNull("shardRates", shardRates);
|
||||||
|
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||||
|
AtomicDouble totalShardRate = new AtomicDouble();
|
||||||
|
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
|
||||||
|
Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
|
||||||
|
assertNotNull("collectionRates", collectionRates);
|
||||||
|
assertEquals(collectionRates.toString(), 1, collectionRates.size());
|
||||||
|
Double collectionRate = collectionRates.get(COLL1);
|
||||||
|
assertNotNull(collectionRate);
|
||||||
|
assertTrue(collectionRate > 5.0);
|
||||||
|
assertEquals(collectionRate, totalNodeRate.get(), 5.0);
|
||||||
|
assertEquals(collectionRate, totalShardRate.get(), 5.0);
|
||||||
|
assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
|
||||||
|
|
||||||
|
// check operations
|
||||||
|
List<Map<String, Object>> ops = (List<Map<String, Object>>) ev.context.get("properties.operations");
|
||||||
|
assertNotNull(ops);
|
||||||
|
assertTrue(ops.size() > 1);
|
||||||
|
for (Map<String, Object> m : ops) {
|
||||||
|
assertEquals("ADDREPLICA", m.get("params.action"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestSearchRateAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||||
|
try {
|
||||||
|
events.add(event);
|
||||||
|
long currentTimeNanos = timeSource.getTimeNs();
|
||||||
|
long eventTimeNanos = event.getEventTime();
|
||||||
|
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||||
|
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||||
|
fail(event.source + " was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerListener extends TriggerListenerBase {
|
||||||
|
@Override
|
||||||
|
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||||
|
super.init(cloudManager, config);
|
||||||
|
listenerCreated.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||||
|
ActionContext context, Throwable error, String message) {
|
||||||
|
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||||
|
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,238 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.params.AutoScalingParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||||
|
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
public class TriggerCooldownIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||||
|
static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||||
|
static boolean failDummyAction = false;
|
||||||
|
private static CountDownLatch actionConstructorCalled = new CountDownLatch(1);
|
||||||
|
private static CountDownLatch actionInitCalled = new CountDownLatch(1);
|
||||||
|
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
private static int waitForSeconds = 1;
|
||||||
|
private static AtomicBoolean triggerFired = new AtomicBoolean();
|
||||||
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCooldown() throws Exception {
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
failDummyAction = false;
|
||||||
|
waitForSeconds = 1;
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_cooldown_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [" +
|
||||||
|
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||||
|
"]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
String setListenerCommand1 = "{" +
|
||||||
|
"'set-listener' : " +
|
||||||
|
"{" +
|
||||||
|
"'name' : 'bar'," +
|
||||||
|
"'trigger' : 'node_added_cooldown_trigger'," +
|
||||||
|
"'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
|
||||||
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
|
"}" +
|
||||||
|
"}";
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
listenerCreated = new CountDownLatch(1);
|
||||||
|
listenerEvents.clear();
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
// wait for listener to capture the SUCCEEDED stage
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
|
||||||
|
// we may get a few IGNORED events if other tests caused events within cooldown period
|
||||||
|
assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
|
||||||
|
long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
|
||||||
|
|
||||||
|
// reset the trigger and captured events
|
||||||
|
listenerEvents.clear();
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
triggerFired.compareAndSet(true, false);
|
||||||
|
|
||||||
|
JettySolrRunner newNode2 = cluster.startJettySolrRunner();
|
||||||
|
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
// wait for listener to capture the SUCCEEDED stage
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
// there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
|
||||||
|
capturedEvents = listenerEvents.get("bar");
|
||||||
|
assertEquals(capturedEvents.toString(), 1, capturedEvents.size());
|
||||||
|
CapturedEvent ev = capturedEvents.get(0);
|
||||||
|
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
|
||||||
|
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
|
||||||
|
// must be larger than cooldown period
|
||||||
|
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
|
||||||
|
prevTimestamp = ev.timestamp;
|
||||||
|
|
||||||
|
// this also resets the cooldown period
|
||||||
|
long modifiedCooldownPeriodSeconds = 7;
|
||||||
|
String setPropertiesCommand = "{\n" +
|
||||||
|
"\t\"set-properties\" : {\n" +
|
||||||
|
"\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : " + modifiedCooldownPeriodSeconds + "\n" +
|
||||||
|
"\t}\n" +
|
||||||
|
"}";
|
||||||
|
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
|
||||||
|
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
|
||||||
|
// reset the trigger and captured events
|
||||||
|
listenerEvents.clear();
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
triggerFired.compareAndSet(true, false);
|
||||||
|
|
||||||
|
JettySolrRunner newNode3 = cluster.startJettySolrRunner();
|
||||||
|
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
triggerFired.compareAndSet(true, false);
|
||||||
|
// add another node
|
||||||
|
JettySolrRunner newNode4 = cluster.startJettySolrRunner();
|
||||||
|
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
// wait for listener to capture the SUCCEEDED stage
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
// there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
|
||||||
|
capturedEvents = listenerEvents.get("bar");
|
||||||
|
assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
|
||||||
|
// first event should be SUCCEEDED
|
||||||
|
ev = capturedEvents.get(0);
|
||||||
|
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
|
||||||
|
|
||||||
|
ev = capturedEvents.get(capturedEvents.size() - 1);
|
||||||
|
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
|
||||||
|
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
|
||||||
|
// must be larger than the modified cooldown period
|
||||||
|
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(modifiedCooldownPeriodSeconds));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerAction extends TriggerActionBase {
|
||||||
|
|
||||||
|
public TestTriggerAction() {
|
||||||
|
actionConstructorCalled.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
|
try {
|
||||||
|
if (triggerFired.compareAndSet(false, true)) {
|
||||||
|
events.add(event);
|
||||||
|
long currentTimeNanos = timeSource.getTimeNs();
|
||||||
|
long eventTimeNanos = event.getEventTime();
|
||||||
|
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||||
|
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||||
|
fail(event.source + " was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
} else {
|
||||||
|
fail(event.source + " was fired more than once!");
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.debug("--throwable", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Map<String, String> args) {
|
||||||
|
log.info("TestTriggerAction init");
|
||||||
|
actionInitCalled.countDown();
|
||||||
|
super.init(args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestTriggerListener extends TriggerListenerBase {
|
||||||
|
@Override
|
||||||
|
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||||
|
super.init(cloudManager, config);
|
||||||
|
listenerCreated.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||||
|
ActionContext context, Throwable error, String message) {
|
||||||
|
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||||
|
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,195 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.params.AutoScalingParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||||
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||||
|
|
||||||
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
|
public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
configureCluster(2)
|
||||||
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
|
.configure();
|
||||||
|
// disable .scheduled_maintenance
|
||||||
|
String suspendTriggerCommand = "{" +
|
||||||
|
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||||
|
"}";
|
||||||
|
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||||
|
SolrClient solrClient = cluster.getSolrClient();
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CountDownLatch getTriggerFiredLatch() {
|
||||||
|
return triggerFiredLatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSetProperties() throws Exception {
|
||||||
|
JettySolrRunner runner = cluster.getJettySolrRunner(0);
|
||||||
|
SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
|
||||||
|
SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
|
||||||
|
AtomicLong diff = new AtomicLong(0);
|
||||||
|
triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference
|
||||||
|
try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
|
||||||
|
AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
|
||||||
|
scheduledTriggers.setAutoScalingConfig(config);
|
||||||
|
scheduledTriggers.add(new TriggerBase(TriggerEventType.NODELOST, "x", Collections.emptyMap(), resourceLoader, solrCloudManager) {
|
||||||
|
@Override
|
||||||
|
protected Map<String, Object> getState() {
|
||||||
|
return Collections.singletonMap("x", "y");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setState(Map<String, Object> state) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restoreState(AutoScaling.Trigger old) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (getTriggerFiredLatch().getCount() == 0) return;
|
||||||
|
long l = diff.get();
|
||||||
|
diff.set(timeSource.getTimeNs() - l);
|
||||||
|
getTriggerFiredLatch().countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS));
|
||||||
|
assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0);
|
||||||
|
|
||||||
|
// change schedule delay
|
||||||
|
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
|
||||||
|
scheduledTriggers.setAutoScalingConfig(config);
|
||||||
|
triggerFiredLatch = new CountDownLatch(2);
|
||||||
|
assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
|
||||||
|
assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0);
|
||||||
|
|
||||||
|
// reset with default properties
|
||||||
|
scheduledTriggers.remove("x"); // remove the old trigger
|
||||||
|
config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
|
||||||
|
scheduledTriggers.setAutoScalingConfig(config);
|
||||||
|
|
||||||
|
// test core thread count
|
||||||
|
List<AutoScaling.Trigger> triggerList = new ArrayList<>();
|
||||||
|
final Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
|
||||||
|
final Set<String> triggerNames = Collections.synchronizedSet(new HashSet<>());
|
||||||
|
triggerFiredLatch = new CountDownLatch(8);
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
triggerList.add(new MockTrigger(TriggerEventType.NODELOST, "x" + i, Collections.emptyMap(), resourceLoader, solrCloudManager) {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// If core pool size is increased then new threads won't be started if existing threads
|
||||||
|
// aren't busy with tasks. So we make this thread wait longer than necessary
|
||||||
|
// so that the pool is forced to start threads for other triggers
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
if (triggerNames.add(getName())) {
|
||||||
|
getTriggerFiredLatch().countDown();
|
||||||
|
threadNames.add(Thread.currentThread().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
scheduledTriggers.add(triggerList.get(i));
|
||||||
|
}
|
||||||
|
assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
|
||||||
|
assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
|
||||||
|
assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE
|
||||||
|
+ " threads but found: " + threadNames,
|
||||||
|
ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size());
|
||||||
|
|
||||||
|
// change core pool size
|
||||||
|
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6));
|
||||||
|
scheduledTriggers.setAutoScalingConfig(config);
|
||||||
|
triggerFiredLatch = new CountDownLatch(8);
|
||||||
|
threadNames.clear();
|
||||||
|
triggerNames.clear();
|
||||||
|
assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
|
||||||
|
assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
|
||||||
|
assertEquals("Expected 6 threads but found: " + threadNames, 6, threadNames.size());
|
||||||
|
|
||||||
|
// reset
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
scheduledTriggers.remove(triggerList.get(i).getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MockTrigger extends TriggerBase {
|
||||||
|
|
||||||
|
public MockTrigger(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
||||||
|
super(eventType, name, properties, loader, cloudManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Map<String, Object> getState() {
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setState(Map<String, Object> state) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restoreState(AutoScaling.Trigger old) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -436,6 +436,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public void testNodeLostTrigger() throws Exception {
|
public void testNodeLostTrigger() throws Exception {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
String setTriggerCommand = "{" +
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud.hdfs;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.apache.solr.cloud.BasicDistributedZk2Test;
|
import org.apache.solr.cloud.BasicDistributedZk2Test;
|
||||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||||
|
@ -33,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
||||||
})
|
})
|
||||||
|
@LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||||
public class HdfsBasicDistributedZk2Test extends BasicDistributedZk2Test {
|
public class HdfsBasicDistributedZk2Test extends BasicDistributedZk2Test {
|
||||||
private static MiniDFSCluster dfsCluster;
|
private static MiniDFSCluster dfsCluster;
|
||||||
|
|
||||||
|
|
|
@ -242,7 +242,8 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
// commented out 26-Mar-2018
|
||||||
|
//@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
||||||
public void testHistory() throws Exception {
|
public void testHistory() throws Exception {
|
||||||
waitForState("Timed out wait for collection be active", COLL_NAME,
|
waitForState("Timed out wait for collection be active", COLL_NAME,
|
||||||
clusterShape(1, 3));
|
clusterShape(1, 3));
|
||||||
|
|
|
@ -36,23 +36,28 @@ The highlighting implementation to use. Acceptable values are: `unified`, `origi
|
||||||
See the <<Choosing a Highlighter>> section below for more details on the differences between the available highlighters.
|
See the <<Choosing a Highlighter>> section below for more details on the differences between the available highlighters.
|
||||||
|
|
||||||
`hl.fl`::
|
`hl.fl`::
|
||||||
Specifies a list of fields to highlight. Accepts a comma- or space-delimited list of fields for which Solr should generate highlighted snippets.
|
Specifies a list of fields to highlight, either comma- or space-delimited.
|
||||||
|
A wildcard of `\*` (asterisk) can be used to match field globs, such as `text_*` or even `\*` to highlight on all fields where highlighting is possible.
|
||||||
|
When using `*`, consider adding `hl.requireFieldMatch=true`.
|
||||||
+
|
+
|
||||||
A wildcard of `\*` (asterisk) can be used to match field globs, such as `text_*` or even `\*` to highlight on all fields where highlighting is possible. When using `*`, consider adding `hl.requireFieldMatch=true`.
|
Note that the field(s) listed here ought to have compatible text-analysis (defined in the schema) with field(s) referenced in the query to be highlighted.
|
||||||
|
It may be necessary to modify `hl.q` and `hl.qparser` and/or modify the text analysis.
|
||||||
|
The following example uses the <<local-parameters-in-queries.adoc#local-parameters-in-queries,local-params>> syntax and <<the-extended-dismax-query-parser.adoc#the-extended-dismax-query-parser,the edismax parser>> to highlight fields in `hl.fl`:
|
||||||
|
`&hl.fl=field1 field2&hl.q={!edismax qf=$hl.fl v=$q}&hl.qparser=lucene&hl.requireFieldMatch=true` (along with other applicable parameters, of course).
|
||||||
+
|
+
|
||||||
When not defined, the defaults defined for the `df` query parameter will be used.
|
The default is the value of the `df` parameter which in turn has no default.
|
||||||
|
|
||||||
`hl.q`::
|
`hl.q`::
|
||||||
A query to use for highlighting. This parameter allows you to highlight different terms than those being used to retrieve documents.
|
A query to use for highlighting.
|
||||||
|
This parameter allows you to highlight different terms or fields than those being used to search for documents.
|
||||||
|
When setting this, you might also need to set `hl.qparser`.
|
||||||
+
|
+
|
||||||
When not defined, the query defined with the `q` parameter will the used.
|
The default is the value of the `q` parameter (already parsed).
|
||||||
+
|
|
||||||
When `hl.qparser` is not defined, the query parser defined with the `defType` query parameter will be used and terms will be analyzed using those rules. This behavior can be overridden by specifying a field, for example: `hl.q=field:term`.
|
|
||||||
|
|
||||||
`hl.qparser`::
|
`hl.qparser`::
|
||||||
The query parser to use for the `hl.q` query.
|
The query parser to use for the `hl.q` query. It only applies when `hl.q` is set.
|
||||||
+
|
+
|
||||||
When not defined, the query parser defined with the `defType` query parameter will be used.
|
The default is the value of the `defType` parameter which in turn defaults to `lucene`.
|
||||||
|
|
||||||
`hl.requireFieldMatch`::
|
`hl.requireFieldMatch`::
|
||||||
By default, `false`, all query terms will be highlighted for each field to be highlighted (`hl.fl`) no matter what fields the parsed query refer to. If set to `true`, only query terms aligning with the field being highlighted will in turn be highlighted.
|
By default, `false`, all query terms will be highlighted for each field to be highlighted (`hl.fl`) no matter what fields the parsed query refer to. If set to `true`, only query terms aligning with the field being highlighted will in turn be highlighted.
|
||||||
|
|
Loading…
Reference in New Issue