From b52bed0a4732bddd9563c9e994551534a856afcf Mon Sep 17 00:00:00 2001 From: Ishan Chattopadhyaya Date: Thu, 12 Dec 2019 10:12:17 +0530 Subject: [PATCH] SOLR-14062: Split IndexSizeTriggerTest into three --- .../IndexSizeTriggerMixedBoundsTest.java | 391 +++++++++++++++ .../IndexSizeTriggerSizeEstimationTest.java | 320 ++++++++++++ .../autoscaling/IndexSizeTriggerTest.java | 473 +----------------- 3 files changed, 713 insertions(+), 471 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerMixedBoundsTest.java create mode 100644 solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerSizeEstimationTest.java diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerMixedBoundsTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerMixedBoundsTest.java new file mode 100644 index 00000000000..1bfa37917fb --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerMixedBoundsTest.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; +import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest; +import org.apache.solr.cloud.CloudUtil; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.SolrResourceLoader; +import org.apache.solr.util.LogLevel; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG") +@LuceneTestCase.Slow +public class IndexSizeTriggerMixedBoundsTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static SolrCloudManager cloudManager; + private static SolrClient solrClient; + private static TimeSource timeSource; + + private static int SPEED = 1; + + static Map> listenerEvents = new ConcurrentHashMap<>(); + static CountDownLatch listenerCreated = new CountDownLatch(1); + static CountDownLatch finished = new CountDownLatch(1); + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(2) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); + solrClient = cluster.getSolrClient(); + timeSource = cloudManager.getTimeSource(); + } + + @After + public void restoreDefaults() throws Exception { + cluster.deleteAllCollections(); + cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1); + cloudManager.getTimeSource().sleep(5000); + listenerEvents.clear(); + listenerCreated = new CountDownLatch(1); + finished = new CountDownLatch(1); + } + + @AfterClass + public static void teardown() throws Exception { + solrClient = null; + cloudManager = null; + } + + public static class CapturingTriggerListener extends TriggerListenerBase { + @Override + public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException { + super.configure(loader, cloudManager, config); + listenerCreated.countDown(); + } + + @Override + public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, + ActionContext context, Throwable error, String message) { + List lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>()); + CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message); + log.info("=======> " + ev); + lst.add(ev); + } + } + + public static class FinishedProcessingListener extends TriggerListenerBase { + + @Override + public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception { + finished.countDown(); + } + } + + @Test + public void testMixedBounds() throws Exception { + String collectionName = "testMixedBounds_collection"; + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, + "conf", 2, 2).setMaxShardsPerNode(2); + create.process(solrClient); + CloudUtil.waitForState(cloudManager, "failed to create " + collectionName, collectionName, + CloudUtil.clusterShape(2, 2, false, true)); + + for (int j = 0; j < 10; j++) { + UpdateRequest ureq = new UpdateRequest(); + ureq.setParam("collection", collectionName); + for (int i = 0; i < 100; i++) { + SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100) + "-" + j); + doc.addField("foo", TestUtil.randomSimpleString(random(), 130, 130)); + ureq.add(doc); + } + solrClient.request(ureq); + } + solrClient.commit(collectionName); + + // check the actual size of shard to set the threshold + QueryResponse rsp = solrClient.query(params(CommonParams.QT, "/admin/metrics", "group", "core")); + NamedList nl = rsp.getResponse(); + nl = (NamedList)nl.get("metrics"); + int maxSize = 0; + for (Iterator> it = nl.iterator(); it.hasNext(); ) { + Map.Entry e = it.next(); + NamedList metrics = (NamedList)e.getValue(); + Object o = metrics.get("INDEX.sizeInBytes"); + assertNotNull("INDEX.sizeInBytes missing: " + metrics, o); + assertTrue("not a number", o instanceof Number); + if (maxSize < ((Number)o).intValue()) { + maxSize = ((Number)o).intValue(); + } + } + assertTrue("maxSize should be non-zero", maxSize > 0); + + int aboveBytes = maxSize * 2 / 3; + + // need to wait for recovery after splitting + long waitForSeconds = 10 + random().nextInt(5); + + // the trigger is initially disabled so that we have time to add listeners + // and have them capture all events once the trigger is enabled + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'index_size_trigger4'," + + "'event' : 'indexSize'," + + "'waitFor' : '" + waitForSeconds + "s'," + + // don't hit this limit when indexing + "'aboveDocs' : 10000," + + // hit this limit when deleting + "'belowDocs' : 100," + + // hit this limit when indexing + "'aboveBytes' : " + aboveBytes + "," + + // don't hit this limit when deleting + "'belowBytes' : 10," + + "'enabled' : false," + + "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," + + "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + String setListenerCommand = "{" + + "'set-listener' : " + + "{" + + "'name' : 'capturing4'," + + "'trigger' : 'index_size_trigger4'," + + "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + + "'beforeAction' : ['compute_plan','execute_plan']," + + "'afterAction' : ['compute_plan','execute_plan']," + + "'class' : '" + CapturingTriggerListener.class.getName() + "'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + setListenerCommand = "{" + + "'set-listener' : " + + "{" + + "'name' : 'finished'," + + "'trigger' : 'index_size_trigger4'," + + "'stage' : ['SUCCEEDED']," + + "'class' : '" + FinishedProcessingListener.class.getName() + "'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + // now enable the trigger + String resumeTriggerCommand = "{" + + "'resume-trigger' : {" + + "'name' : 'index_size_trigger4'" + + "}" + + "}"; + log.info("-- resuming trigger"); + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); + + boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); + assertTrue("did not finish processing in time", await); + log.info("-- suspending trigger"); + // suspend the trigger to avoid generating more events + String suspendTriggerCommand = "{" + + "'suspend-trigger' : {" + + "'name' : 'index_size_trigger4'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + assertEquals(1, listenerEvents.size()); + List events = listenerEvents.get("capturing4"); + assertNotNull("'capturing4' events not found", events); + assertEquals("events: " + events, 6, events.size()); + assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); + assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); + assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage); + assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage); + assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage); + assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage); + + // collection should have 2 inactive and 4 active shards + CloudUtil.waitForState(cloudManager, "failed to create " + collectionName, collectionName, + CloudUtil.clusterShape(6, 2, true, true)); + + // check ops + List ops = (List) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS); + assertNotNull("should contain requestedOps", ops); + assertEquals("number of ops", 2, ops.size()); + boolean shard1 = false; + boolean shard2 = false; + for (TriggerEvent.Op op : ops) { + assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction()); + Set> hints = (Set>)op.getHints().get(Suggester.Hint.COLL_SHARD); + assertNotNull("hints", hints); + assertEquals("hints", 1, hints.size()); + Pair p = hints.iterator().next(); + assertEquals(collectionName, p.first()); + if (p.second().equals("shard1")) { + shard1 = true; + } else if (p.second().equals("shard2")) { + shard2 = true; + } else { + fail("unexpected shard name " + p.second()); + } + } + assertTrue("shard1 should be split", shard1); + assertTrue("shard2 should be split", shard2); + + // now delete most of docs to trigger belowDocs condition + listenerEvents.clear(); + finished = new CountDownLatch(1); + + // suspend the trigger first so that we can safely delete all docs + suspendTriggerCommand = "{" + + "'suspend-trigger' : {" + + "'name' : 'index_size_trigger4'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + log.info("-- deleting documents"); + for (int j = 0; j < 10; j++) { + UpdateRequest ureq = new UpdateRequest(); + ureq.setParam("collection", collectionName); + for (int i = 0; i < 98; i++) { + ureq.deleteById("id-" + (i * 100) + "-" + j); + } + solrClient.request(ureq); + } + cloudManager.getTimeSource().sleep(5000); + // make sure the actual index size is reduced by deletions, otherwise we may still violate aboveBytes + UpdateRequest ur = new UpdateRequest(); + ur.setParam(UpdateParams.COMMIT, "true"); + ur.setParam(UpdateParams.EXPUNGE_DELETES, "true"); + ur.setParam(UpdateParams.OPTIMIZE, "true"); + ur.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "1"); + ur.setParam(UpdateParams.WAIT_SEARCHER, "true"); + ur.setParam(UpdateParams.OPEN_SEARCHER, "true"); + log.info("-- requesting optimize / expungeDeletes / commit"); + solrClient.request(ur, collectionName); + + // wait for the segments to merge to reduce the index size + cloudManager.getTimeSource().sleep(50000); + + // add some docs so that every shard gets an update + // we can reduce the number of docs here but this also works + for (int j = 0; j < 1; j++) { + UpdateRequest ureq = new UpdateRequest(); + ureq.setParam("collection", collectionName); + for (int i = 0; i < 98; i++) { + ureq.add("id", "id-" + (i * 100) + "-" + j); + } + solrClient.request(ureq); + } + + log.info("-- requesting commit"); + solrClient.commit(collectionName, true, true); + + // resume the trigger + log.info("-- resuming trigger"); + // resume trigger + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); + + await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); + assertTrue("did not finish processing in time", await); + log.info("-- suspending trigger"); + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + assertEquals(1, listenerEvents.size()); + events = listenerEvents.get("capturing4"); + assertNotNull("'capturing4' events not found", events); + assertEquals("events: " + events, 6, events.size()); + assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); + assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); + assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage); + assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage); + assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage); + assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage); + + // check ops + ops = (List) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS); + assertNotNull("should contain requestedOps", ops); + assertTrue("number of ops: " + ops, ops.size() > 0); + for (TriggerEvent.Op op : ops) { + assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()); + Set> hints = (Set>)op.getHints().get(Suggester.Hint.COLL_SHARD); + assertNotNull("hints", hints); + assertEquals("hints", 2, hints.size()); + Pair p = hints.iterator().next(); + assertEquals(collectionName, p.first()); + } + + // TODO: fix this once MERGESHARDS is supported + List unsupportedOps = (List)events.get(2).context.get("properties.unsupportedOps"); + assertNotNull("should have unsupportedOps", unsupportedOps); + assertEquals(unsupportedOps.toString() + "\n" + ops, ops.size(), unsupportedOps.size()); + unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction())); + } + +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerSizeEstimationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerSizeEstimationTest.java new file mode 100644 index 00000000000..f88767c2ccf --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerSizeEstimationTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling; + +import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; +import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest; +import org.apache.solr.cloud.CloudUtil; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.cloud.autoscaling.sim.SimUtils; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.SolrResourceLoader; +import org.apache.solr.metrics.SolrCoreMetricManager; +import org.apache.solr.util.LogLevel; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG") +@LuceneTestCase.Slow +public class IndexSizeTriggerSizeEstimationTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static SolrCloudManager cloudManager; + private static SolrClient solrClient; + private static TimeSource timeSource; + + private static int SPEED = 1; + + static Map> listenerEvents = new ConcurrentHashMap<>(); + static CountDownLatch listenerCreated = new CountDownLatch(1); + static CountDownLatch finished = new CountDownLatch(1); + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(2) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); + solrClient = cluster.getSolrClient(); + timeSource = cloudManager.getTimeSource(); + } + + @After + public void restoreDefaults() throws Exception { + cluster.deleteAllCollections(); + cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1); + cloudManager.getTimeSource().sleep(5000); + listenerEvents.clear(); + listenerCreated = new CountDownLatch(1); + finished = new CountDownLatch(1); + } + + @AfterClass + public static void teardown() throws Exception { + solrClient = null; + cloudManager = null; + } + + public static class CapturingTriggerListener extends TriggerListenerBase { + @Override + public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException { + super.configure(loader, cloudManager, config); + listenerCreated.countDown(); + } + + @Override + public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, + ActionContext context, Throwable error, String message) { + List lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>()); + CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message); + log.info("=======> " + ev); + lst.add(ev); + } + } + + public static class FinishedProcessingListener extends TriggerListenerBase { + + @Override + public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception { + finished.countDown(); + } + } + + @Test + public void testEstimatedIndexSize() throws Exception { + String collectionName = "testEstimatedIndexSize_collection"; + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, + "conf", 2, 2).setMaxShardsPerNode(2); + create.process(solrClient); + + CloudUtil.waitForState(cloudManager, "failed to create " + collectionName, collectionName, + CloudUtil.clusterShape(2, 2, false, true)); + + int NUM_DOCS = 20; + for (int i = 0; i < NUM_DOCS; i++) { + SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100)); + solrClient.add(collectionName, doc); + } + solrClient.commit(collectionName); + + // get the size of the leader's index + DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collectionName); + Replica leader = coll.getSlice("shard1").getLeader(); + String replicaName = Utils.parseMetricsReplicaName(collectionName, leader.getCoreName()); + assertNotNull("replicaName could not be constructed from " + leader, replicaName); + final String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, "shard1", replicaName, null); + Set tags = SimUtils.COMMON_REPLICA_TAGS.stream() + .map(s -> "metrics:" + registry + ":" + s).collect(Collectors.toSet()); + Map sizes = cloudManager.getNodeStateProvider().getNodeValues(leader.getNodeName(), tags); + String commitSizeTag = "metrics:" + registry + ":SEARCHER.searcher.indexCommitSize"; + String numDocsTag = "metrics:" + registry + ":SEARCHER.searcher.numDocs"; + String maxDocTag = "metrics:" + registry + ":SEARCHER.searcher.maxDoc"; + assertNotNull(sizes.toString(), sizes.get(commitSizeTag)); + assertNotNull(sizes.toString(), sizes.get(numDocsTag)); + assertNotNull(sizes.toString(), sizes.get(maxDocTag)); + long commitSize = ((Number)sizes.get(commitSizeTag)).longValue(); + long maxDoc = ((Number)sizes.get(maxDocTag)).longValue(); + long numDocs = ((Number)sizes.get(numDocsTag)).longValue(); + + assertEquals("maxDoc != numDocs", maxDoc, numDocs); + assertTrue("unexpected numDocs=" + numDocs, numDocs > NUM_DOCS / 3); + + long aboveBytes = commitSize * 9 / 10; + long waitForSeconds = 3 + random().nextInt(5); + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'index_size_trigger7'," + + "'event' : 'indexSize'," + + "'waitFor' : '" + waitForSeconds + "s'," + + "'splitMethod' : 'link'," + + "'aboveBytes' : " + aboveBytes + "," + + "'enabled' : false," + + "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," + + "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + String setListenerCommand = "{" + + "'set-listener' : " + + "{" + + "'name' : 'capturing7'," + + "'trigger' : 'index_size_trigger7'," + + "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + + "'beforeAction' : ['compute_plan','execute_plan']," + + "'afterAction' : ['compute_plan','execute_plan']," + + "'class' : '" + CapturingTriggerListener.class.getName() + "'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + setListenerCommand = "{" + + "'set-listener' : " + + "{" + + "'name' : 'finished'," + + "'trigger' : 'index_size_trigger7'," + + "'stage' : ['SUCCEEDED']," + + "'class' : '" + FinishedProcessingListener.class.getName() + "'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + // enable the trigger + String resumeTriggerCommand = "{" + + "'resume-trigger' : {" + + "'name' : 'index_size_trigger7'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); + response = solrClient.request(req); + assertEquals("success", response.get("result").toString()); + + // aboveBytes was set to be slightly lower than the actual size of at least one shard, so + // we're expecting a SPLITSHARD - but with 'link' method the actual size of the resulting shards + // will likely not go down. However, the estimated size of the latest commit point will go down + // (see SOLR-12941). + + timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); + + boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); + assertTrue("did not finish processing in time", await); + // suspend the trigger + String suspendTriggerCommand = "{" + + "'suspend-trigger' : {" + + "'name' : 'index_size_trigger7'" + + "}" + + "}"; + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); + response = solrClient.request(req); + assertEquals("success", response.get("result").toString()); + + assertEquals(1, listenerEvents.size()); + List events = listenerEvents.get("capturing7"); + assertNotNull(listenerEvents.toString(), events); + assertFalse("empty events?", events.isEmpty()); + CapturedEvent ev = events.get(0); + List ops = (List< TriggerEvent.Op>)ev.event.properties.get(TriggerEvent.REQUESTED_OPS); + assertNotNull("no requested ops in " + ev, ops); + assertFalse("empty list of ops in " + ev, ops.isEmpty()); + Set parentShards = new HashSet<>(); + ops.forEach(op -> { + assertTrue(op.toString(), op.getAction() == CollectionParams.CollectionAction.SPLITSHARD); + Collection> hints = (Collection>)op.getHints().get(Suggester.Hint.COLL_SHARD); + assertNotNull("no hints in op " + op, hints); + hints.forEach(h -> parentShards.add(h.second())); + }); + + // allow for recovery of at least some sub-shards + timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); + + coll = cloudManager.getClusterStateProvider().getCollection(collectionName); + + int checkedSubShards = 0; + + for (String parentShard : parentShards) { + for (String subShard : Arrays.asList(parentShard + "_0", parentShard + "_1")) { + leader = coll.getSlice(subShard).getLeader(); + if (leader == null) { + // no leader yet - skip it + } + checkedSubShards++; + replicaName = Utils.parseMetricsReplicaName(collectionName, leader.getCoreName()); + assertNotNull("replicaName could not be constructed from " + leader, replicaName); + final String subregistry = SolrCoreMetricManager.createRegistryName(true, collectionName, subShard, replicaName, null); + Set subtags = SimUtils.COMMON_REPLICA_TAGS.stream() + .map(s -> "metrics:" + subregistry + ":" + s).collect(Collectors.toSet()); + sizes = cloudManager.getNodeStateProvider().getNodeValues(leader.getNodeName(), subtags); + commitSizeTag = "metrics:" + subregistry + ":SEARCHER.searcher.indexCommitSize"; + numDocsTag = "metrics:" + subregistry + ":SEARCHER.searcher.numDocs"; + maxDocTag = "metrics:" + subregistry + ":SEARCHER.searcher.maxDoc"; + assertNotNull(sizes.toString(), sizes.get(commitSizeTag)); + assertNotNull(sizes.toString(), sizes.get(numDocsTag)); + assertNotNull(sizes.toString(), sizes.get(maxDocTag)); + long subCommitSize = ((Number)sizes.get(commitSizeTag)).longValue(); + long subMaxDoc = ((Number)sizes.get(maxDocTag)).longValue(); + long subNumDocs = ((Number)sizes.get(numDocsTag)).longValue(); + assertTrue("subNumDocs=" + subNumDocs + " should be less than subMaxDoc=" + subMaxDoc + + " due to link split", subNumDocs < subMaxDoc); + assertTrue("subCommitSize=" + subCommitSize + " should be still greater than aboveBytes=" + aboveBytes + + " due to link split", subCommitSize > aboveBytes); + // calculate estimated size using the same formula + long estimatedSize = IndexSizeTrigger.estimatedSize(subMaxDoc, subNumDocs, subCommitSize); + assertTrue("estimatedSize=" + estimatedSize + " should be lower than aboveBytes=" + aboveBytes, + estimatedSize < aboveBytes); + } + } + + assertTrue("didn't find any leaders in new sub-shards", checkedSubShards > 0); + + // reset & resume + listenerEvents.clear(); + finished = new CountDownLatch(1); + req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); + response = solrClient.request(req); + assertEquals("success", response.get("result").toString()); + timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); + + // estimated shard size should fall well below the aboveBytes, even though the real commitSize + // still remains larger due to the splitMethod=link side-effects + await = finished.await(10000 / SPEED, TimeUnit.MILLISECONDS); + assertFalse("should not fire the trigger again! " + listenerEvents, await); + + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java index ee0b68828a8..7ec3142c3a7 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java @@ -17,13 +17,11 @@ package org.apache.solr.cloud.autoscaling; +import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH; + import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,10 +30,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import org.apache.lucene.util.LuceneTestCase; -import org.apache.lucene.util.TestUtil; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.cloud.SolrCloudManager; @@ -43,27 +39,19 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.UpdateRequest; -import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest; import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.cloud.autoscaling.sim.SimCloudManager; -import org.apache.solr.cloud.autoscaling.sim.SimUtils; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CommonAdminParams; -import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; import org.apache.solr.core.SolrResourceLoader; -import org.apache.solr.metrics.SolrCoreMetricManager; import org.apache.solr.update.SolrIndexSplitter; import org.apache.solr.util.LogLevel; import org.junit.After; @@ -73,8 +61,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH; - /** * */ @@ -107,7 +93,6 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { .addConfig("conf", configset("cloud-minimal")) .configure(); realCluster = random().nextBoolean(); - realCluster = true; if (realCluster) { cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); solrClient = cluster.getSolrClient(); @@ -502,269 +487,6 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction())); } - @Test - public void testMixedBounds() throws Exception { - if (!realCluster) { - log.info("This test doesn't work with a simulated cluster"); - return; - } - - String collectionName = "testMixedBounds_collection"; - CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, - "conf", 2, 2).setMaxShardsPerNode(2); - create.process(solrClient); - CloudUtil.waitForState(cloudManager, "failed to create " + collectionName, collectionName, - CloudUtil.clusterShape(2, 2, false, true)); - - for (int j = 0; j < 10; j++) { - UpdateRequest ureq = new UpdateRequest(); - ureq.setParam("collection", collectionName); - for (int i = 0; i < 100; i++) { - SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100) + "-" + j); - doc.addField("foo", TestUtil.randomSimpleString(random(), 130, 130)); - ureq.add(doc); - } - solrClient.request(ureq); - } - solrClient.commit(collectionName); - - // check the actual size of shard to set the threshold - QueryResponse rsp = solrClient.query(params(CommonParams.QT, "/admin/metrics", "group", "core")); - NamedList nl = rsp.getResponse(); - nl = (NamedList)nl.get("metrics"); - int maxSize = 0; - for (Iterator> it = nl.iterator(); it.hasNext(); ) { - Map.Entry e = it.next(); - NamedList metrics = (NamedList)e.getValue(); - Object o = metrics.get("INDEX.sizeInBytes"); - assertNotNull("INDEX.sizeInBytes missing: " + metrics, o); - assertTrue("not a number", o instanceof Number); - if (maxSize < ((Number)o).intValue()) { - maxSize = ((Number)o).intValue(); - } - } - assertTrue("maxSize should be non-zero", maxSize > 0); - - int aboveBytes = maxSize * 2 / 3; - - // need to wait for recovery after splitting - long waitForSeconds = 10 + random().nextInt(5); - - // the trigger is initially disabled so that we have time to add listeners - // and have them capture all events once the trigger is enabled - String setTriggerCommand = "{" + - "'set-trigger' : {" + - "'name' : 'index_size_trigger4'," + - "'event' : 'indexSize'," + - "'waitFor' : '" + waitForSeconds + "s'," + - // don't hit this limit when indexing - "'aboveDocs' : 10000," + - // hit this limit when deleting - "'belowDocs' : 100," + - // hit this limit when indexing - "'aboveBytes' : " + aboveBytes + "," + - // don't hit this limit when deleting - "'belowBytes' : 10," + - "'enabled' : false," + - "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," + - "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" + - "}}"; - SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand); - NamedList response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - String setListenerCommand = "{" + - "'set-listener' : " + - "{" + - "'name' : 'capturing4'," + - "'trigger' : 'index_size_trigger4'," + - "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + - "'beforeAction' : ['compute_plan','execute_plan']," + - "'afterAction' : ['compute_plan','execute_plan']," + - "'class' : '" + CapturingTriggerListener.class.getName() + "'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - setListenerCommand = "{" + - "'set-listener' : " + - "{" + - "'name' : 'finished'," + - "'trigger' : 'index_size_trigger4'," + - "'stage' : ['SUCCEEDED']," + - "'class' : '" + FinishedProcessingListener.class.getName() + "'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - // now enable the trigger - String resumeTriggerCommand = "{" + - "'resume-trigger' : {" + - "'name' : 'index_size_trigger4'" + - "}" + - "}"; - log.info("-- resuming trigger"); - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); - - boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); - assertTrue("did not finish processing in time", await); - log.info("-- suspending trigger"); - // suspend the trigger to avoid generating more events - String suspendTriggerCommand = "{" + - "'suspend-trigger' : {" + - "'name' : 'index_size_trigger4'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - assertEquals(1, listenerEvents.size()); - List events = listenerEvents.get("capturing4"); - assertNotNull("'capturing4' events not found", events); - assertEquals("events: " + events, 6, events.size()); - assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); - assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); - assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage); - assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage); - assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage); - assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage); - - // collection should have 2 inactive and 4 active shards - CloudUtil.waitForState(cloudManager, "failed to create " + collectionName, collectionName, - CloudUtil.clusterShape(6, 2, true, true)); - - // check ops - List ops = (List) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS); - assertNotNull("should contain requestedOps", ops); - assertEquals("number of ops", 2, ops.size()); - boolean shard1 = false; - boolean shard2 = false; - for (TriggerEvent.Op op : ops) { - assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction()); - Set> hints = (Set>)op.getHints().get(Suggester.Hint.COLL_SHARD); - assertNotNull("hints", hints); - assertEquals("hints", 1, hints.size()); - Pair p = hints.iterator().next(); - assertEquals(collectionName, p.first()); - if (p.second().equals("shard1")) { - shard1 = true; - } else if (p.second().equals("shard2")) { - shard2 = true; - } else { - fail("unexpected shard name " + p.second()); - } - } - assertTrue("shard1 should be split", shard1); - assertTrue("shard2 should be split", shard2); - - // now delete most of docs to trigger belowDocs condition - listenerEvents.clear(); - finished = new CountDownLatch(1); - - // suspend the trigger first so that we can safely delete all docs - suspendTriggerCommand = "{" + - "'suspend-trigger' : {" + - "'name' : 'index_size_trigger4'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - log.info("-- deleting documents"); - for (int j = 0; j < 10; j++) { - UpdateRequest ureq = new UpdateRequest(); - ureq.setParam("collection", collectionName); - for (int i = 0; i < 98; i++) { - ureq.deleteById("id-" + (i * 100) + "-" + j); - } - solrClient.request(ureq); - } - cloudManager.getTimeSource().sleep(5000); - // make sure the actual index size is reduced by deletions, otherwise we may still violate aboveBytes - UpdateRequest ur = new UpdateRequest(); - ur.setParam(UpdateParams.COMMIT, "true"); - ur.setParam(UpdateParams.EXPUNGE_DELETES, "true"); - ur.setParam(UpdateParams.OPTIMIZE, "true"); - ur.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "1"); - ur.setParam(UpdateParams.WAIT_SEARCHER, "true"); - ur.setParam(UpdateParams.OPEN_SEARCHER, "true"); - log.info("-- requesting optimize / expungeDeletes / commit"); - solrClient.request(ur, collectionName); - - // wait for the segments to merge to reduce the index size - cloudManager.getTimeSource().sleep(50000); - - // add some docs so that every shard gets an update - // we can reduce the number of docs here but this also works - for (int j = 0; j < 1; j++) { - UpdateRequest ureq = new UpdateRequest(); - ureq.setParam("collection", collectionName); - for (int i = 0; i < 98; i++) { - ureq.add("id", "id-" + (i * 100) + "-" + j); - } - solrClient.request(ureq); - } - - log.info("-- requesting commit"); - solrClient.commit(collectionName, true, true); - - // resume the trigger - log.info("-- resuming trigger"); - // resume trigger - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); - - await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); - assertTrue("did not finish processing in time", await); - log.info("-- suspending trigger"); - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, suspendTriggerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - assertEquals(1, listenerEvents.size()); - events = listenerEvents.get("capturing4"); - assertNotNull("'capturing4' events not found", events); - assertEquals("events: " + events, 6, events.size()); - assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); - assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); - assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage); - assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage); - assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage); - assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage); - - // check ops - ops = (List) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS); - assertNotNull("should contain requestedOps", ops); - assertTrue("number of ops: " + ops, ops.size() > 0); - for (TriggerEvent.Op op : ops) { - assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()); - Set> hints = (Set>)op.getHints().get(Suggester.Hint.COLL_SHARD); - assertNotNull("hints", hints); - assertEquals("hints", 2, hints.size()); - Pair p = hints.iterator().next(); - assertEquals(collectionName, p.first()); - } - - // TODO: fix this once MERGESHARDS is supported - List unsupportedOps = (List)events.get(2).context.get("properties.unsupportedOps"); - assertNotNull("should have unsupportedOps", unsupportedOps); - assertEquals(unsupportedOps.toString() + "\n" + ops, ops.size(), unsupportedOps.size()); - unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction())); - } - @Test public void testMaxOps() throws Exception { String collectionName = "testMaxOps_collection"; @@ -1015,197 +737,6 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { } } - - @Test - public void testEstimatedIndexSize() throws Exception { - if (!realCluster) { - log.info("This test doesn't work with a simulated cluster"); - return; - } - String collectionName = "testEstimatedIndexSize_collection"; - CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, - "conf", 2, 2).setMaxShardsPerNode(2); - create.process(solrClient); - - CloudUtil.waitForState(cloudManager, "failed to create " + collectionName, collectionName, - CloudUtil.clusterShape(2, 2, false, true)); - - int NUM_DOCS = 20; - for (int i = 0; i < NUM_DOCS; i++) { - SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100)); - solrClient.add(collectionName, doc); - } - solrClient.commit(collectionName); - - // get the size of the leader's index - DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collectionName); - Replica leader = coll.getSlice("shard1").getLeader(); - String replicaName = Utils.parseMetricsReplicaName(collectionName, leader.getCoreName()); - assertNotNull("replicaName could not be constructed from " + leader, replicaName); - final String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, "shard1", replicaName, null); - Set tags = SimUtils.COMMON_REPLICA_TAGS.stream() - .map(s -> "metrics:" + registry + ":" + s).collect(Collectors.toSet()); - Map sizes = cloudManager.getNodeStateProvider().getNodeValues(leader.getNodeName(), tags); - String commitSizeTag = "metrics:" + registry + ":SEARCHER.searcher.indexCommitSize"; - String numDocsTag = "metrics:" + registry + ":SEARCHER.searcher.numDocs"; - String maxDocTag = "metrics:" + registry + ":SEARCHER.searcher.maxDoc"; - assertNotNull(sizes.toString(), sizes.get(commitSizeTag)); - assertNotNull(sizes.toString(), sizes.get(numDocsTag)); - assertNotNull(sizes.toString(), sizes.get(maxDocTag)); - long commitSize = ((Number)sizes.get(commitSizeTag)).longValue(); - long maxDoc = ((Number)sizes.get(maxDocTag)).longValue(); - long numDocs = ((Number)sizes.get(numDocsTag)).longValue(); - - assertEquals("maxDoc != numDocs", maxDoc, numDocs); - assertTrue("unexpected numDocs=" + numDocs, numDocs > NUM_DOCS / 3); - - long aboveBytes = commitSize * 9 / 10; - long waitForSeconds = 3 + random().nextInt(5); - String setTriggerCommand = "{" + - "'set-trigger' : {" + - "'name' : 'index_size_trigger7'," + - "'event' : 'indexSize'," + - "'waitFor' : '" + waitForSeconds + "s'," + - "'splitMethod' : 'link'," + - "'aboveBytes' : " + aboveBytes + "," + - "'enabled' : false," + - "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," + - "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" + - "}}"; - SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand); - NamedList response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - String setListenerCommand = "{" + - "'set-listener' : " + - "{" + - "'name' : 'capturing7'," + - "'trigger' : 'index_size_trigger7'," + - "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + - "'beforeAction' : ['compute_plan','execute_plan']," + - "'afterAction' : ['compute_plan','execute_plan']," + - "'class' : '" + CapturingTriggerListener.class.getName() + "'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - setListenerCommand = "{" + - "'set-listener' : " + - "{" + - "'name' : 'finished'," + - "'trigger' : 'index_size_trigger7'," + - "'stage' : ['SUCCEEDED']," + - "'class' : '" + FinishedProcessingListener.class.getName() + "'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand); - response = solrClient.request(req); - assertEquals(response.get("result").toString(), "success"); - - // enable the trigger - String resumeTriggerCommand = "{" + - "'resume-trigger' : {" + - "'name' : 'index_size_trigger7'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); - response = solrClient.request(req); - assertEquals("success", response.get("result").toString()); - - // aboveBytes was set to be slightly lower than the actual size of at least one shard, so - // we're expecting a SPLITSHARD - but with 'link' method the actual size of the resulting shards - // will likely not go down. However, the estimated size of the latest commit point will go down - // (see SOLR-12941). - - timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); - - boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); - assertTrue("did not finish processing in time", await); - // suspend the trigger - String suspendTriggerCommand = "{" + - "'suspend-trigger' : {" + - "'name' : 'index_size_trigger7'" + - "}" + - "}"; - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); - response = solrClient.request(req); - assertEquals("success", response.get("result").toString()); - - assertEquals(1, listenerEvents.size()); - List events = listenerEvents.get("capturing7"); - assertNotNull(listenerEvents.toString(), events); - assertFalse("empty events?", events.isEmpty()); - CapturedEvent ev = events.get(0); - List ops = (List< TriggerEvent.Op>)ev.event.properties.get(TriggerEvent.REQUESTED_OPS); - assertNotNull("no requested ops in " + ev, ops); - assertFalse("empty list of ops in " + ev, ops.isEmpty()); - Set parentShards = new HashSet<>(); - ops.forEach(op -> { - assertTrue(op.toString(), op.getAction() == CollectionParams.CollectionAction.SPLITSHARD); - Collection> hints = (Collection>)op.getHints().get(Suggester.Hint.COLL_SHARD); - assertNotNull("no hints in op " + op, hints); - hints.forEach(h -> parentShards.add(h.second())); - }); - - // allow for recovery of at least some sub-shards - timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); - - coll = cloudManager.getClusterStateProvider().getCollection(collectionName); - - int checkedSubShards = 0; - - for (String parentShard : parentShards) { - for (String subShard : Arrays.asList(parentShard + "_0", parentShard + "_1")) { - leader = coll.getSlice(subShard).getLeader(); - if (leader == null) { - // no leader yet - skip it - } - checkedSubShards++; - replicaName = Utils.parseMetricsReplicaName(collectionName, leader.getCoreName()); - assertNotNull("replicaName could not be constructed from " + leader, replicaName); - final String subregistry = SolrCoreMetricManager.createRegistryName(true, collectionName, subShard, replicaName, null); - Set subtags = SimUtils.COMMON_REPLICA_TAGS.stream() - .map(s -> "metrics:" + subregistry + ":" + s).collect(Collectors.toSet()); - sizes = cloudManager.getNodeStateProvider().getNodeValues(leader.getNodeName(), subtags); - commitSizeTag = "metrics:" + subregistry + ":SEARCHER.searcher.indexCommitSize"; - numDocsTag = "metrics:" + subregistry + ":SEARCHER.searcher.numDocs"; - maxDocTag = "metrics:" + subregistry + ":SEARCHER.searcher.maxDoc"; - assertNotNull(sizes.toString(), sizes.get(commitSizeTag)); - assertNotNull(sizes.toString(), sizes.get(numDocsTag)); - assertNotNull(sizes.toString(), sizes.get(maxDocTag)); - long subCommitSize = ((Number)sizes.get(commitSizeTag)).longValue(); - long subMaxDoc = ((Number)sizes.get(maxDocTag)).longValue(); - long subNumDocs = ((Number)sizes.get(numDocsTag)).longValue(); - assertTrue("subNumDocs=" + subNumDocs + " should be less than subMaxDoc=" + subMaxDoc + - " due to link split", subNumDocs < subMaxDoc); - assertTrue("subCommitSize=" + subCommitSize + " should be still greater than aboveBytes=" + aboveBytes + - " due to link split", subCommitSize > aboveBytes); - // calculate estimated size using the same formula - long estimatedSize = IndexSizeTrigger.estimatedSize(subMaxDoc, subNumDocs, subCommitSize); - assertTrue("estimatedSize=" + estimatedSize + " should be lower than aboveBytes=" + aboveBytes, - estimatedSize < aboveBytes); - } - } - - assertTrue("didn't find any leaders in new sub-shards", checkedSubShards > 0); - - // reset & resume - listenerEvents.clear(); - finished = new CountDownLatch(1); - req = AutoScalingRequest.create(SolrRequest.METHOD.POST, resumeTriggerCommand); - response = solrClient.request(req); - assertEquals("success", response.get("result").toString()); - timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS)); - - // estimated shard size should fall well below the aboveBytes, even though the real commitSize - // still remains larger due to the splitMethod=link side-effects - await = finished.await(10000 / SPEED, TimeUnit.MILLISECONDS); - assertFalse("should not fire the trigger again! " + listenerEvents, await); - - } - private Map createTriggerProps(long waitForSeconds) { Map props = new HashMap<>(); props.put("event", "indexSize");