mirror of https://github.com/apache/lucene.git
SOLR-12815: Implement maxOps limit for IndexSizeTrigger.
This commit is contained in:
parent
152fd966a7
commit
452c2dabf0
|
@ -127,6 +127,8 @@ New Features
|
|||
|
||||
* SOLR-12822: /autoscaling/suggestions to include suggestion to add-replica for lost replicas (noble)
|
||||
|
||||
* SOLR-12815: Implement maxOps limit for IndexSizeTrigger. (ab)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
public static final String BELOW_DOCS_PROP = "belowDocs";
|
||||
public static final String BELOW_OP_PROP = "belowOp";
|
||||
public static final String COLLECTIONS_PROP = "collections";
|
||||
public static final String MAX_OPS_PROP = "maxOps";
|
||||
|
||||
public static final String BYTES_SIZE_PROP = "__bytes__";
|
||||
public static final String DOCS_SIZE_PROP = "__docs__";
|
||||
|
@ -69,9 +70,12 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
public static final String BELOW_SIZE_PROP = "belowSize";
|
||||
public static final String VIOLATION_PROP = "violationType";
|
||||
|
||||
public static final int DEFAULT_MAX_OPS = 10;
|
||||
|
||||
public enum Unit { bytes, docs }
|
||||
|
||||
private long aboveBytes, aboveDocs, belowBytes, belowDocs;
|
||||
private int maxOps;
|
||||
private CollectionParams.CollectionAction aboveOp, belowOp;
|
||||
private final Set<String> collections = new HashSet<>();
|
||||
private final Map<String, Long> lastAboveEventMap = new ConcurrentHashMap<>();
|
||||
|
@ -80,7 +84,8 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
public IndexSizeTrigger(String name) {
|
||||
super(TriggerEventType.INDEXSIZE, name);
|
||||
TriggerUtils.validProperties(validProperties,
|
||||
ABOVE_BYTES_PROP, ABOVE_DOCS_PROP, BELOW_BYTES_PROP, BELOW_DOCS_PROP, COLLECTIONS_PROP);
|
||||
ABOVE_BYTES_PROP, ABOVE_DOCS_PROP, BELOW_BYTES_PROP, BELOW_DOCS_PROP,
|
||||
COLLECTIONS_PROP, MAX_OPS_PROP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,6 +156,15 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
if (belowOp == null) {
|
||||
throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of: '" + belowOpStr + "'");
|
||||
}
|
||||
String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS));
|
||||
try {
|
||||
maxOps = Integer.parseInt(maxOpsStr);
|
||||
if (maxOps < 1) {
|
||||
throw new Exception("must be > 1");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(getName(), MAX_OPS_PROP, "invalid value: '" + maxOpsStr + "': " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -351,7 +365,22 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
// calculate ops
|
||||
final List<TriggerEvent.Op> ops = new ArrayList<>();
|
||||
aboveSize.forEach((coll, replicas) -> {
|
||||
// sort by decreasing size to first split the largest ones
|
||||
// XXX see the comment below about using DOCS_SIZE_PROP in lieu of BYTES_SIZE_PROP
|
||||
replicas.sort((r1, r2) -> {
|
||||
long delta = (Long) r1.getVariable(DOCS_SIZE_PROP) - (Long) r2.getVariable(DOCS_SIZE_PROP);
|
||||
if (delta > 0) {
|
||||
return -1;
|
||||
} else if (delta < 0) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
replicas.forEach(r -> {
|
||||
if (ops.size() >= maxOps) {
|
||||
return;
|
||||
}
|
||||
TriggerEvent.Op op = new TriggerEvent.Op(aboveOp);
|
||||
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
|
||||
ops.add(op);
|
||||
|
@ -365,6 +394,9 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
if (replicas.size() < 2) {
|
||||
return;
|
||||
}
|
||||
if (ops.size() >= maxOps) {
|
||||
return;
|
||||
}
|
||||
// sort by increasing size
|
||||
replicas.sort((r1, r2) -> {
|
||||
// XXX this is not quite correct - if BYTES_SIZE_PROP decided that replica got here
|
||||
|
|
|
@ -681,6 +681,147 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxOps() throws Exception {
|
||||
String collectionName = "testMaxOps_collection";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||
"conf", 5, 2).setMaxShardsPerNode(10);
|
||||
create.process(solrClient);
|
||||
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
|
||||
CloudTestUtils.clusterShape(5, 2, false, true));
|
||||
|
||||
long waitForSeconds = 3 + random().nextInt(5);
|
||||
// add disabled trigger
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'index_size_trigger5'," +
|
||||
"'event' : 'indexSize'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'aboveDocs' : 10," +
|
||||
"'enabled' : false," +
|
||||
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'capturing5'," +
|
||||
"'trigger' : 'index_size_trigger5'," +
|
||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
|
||||
"'beforeAction' : ['compute_plan']," +
|
||||
"'afterAction' : ['compute_plan']," +
|
||||
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'finished'," +
|
||||
"'trigger' : 'index_size_trigger5'," +
|
||||
"'stage' : ['SUCCEEDED']," +
|
||||
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
|
||||
for (int i = 0; i < 200; i++) {
|
||||
SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
|
||||
solrClient.add(collectionName, doc);
|
||||
}
|
||||
solrClient.commit(collectionName);
|
||||
|
||||
// enable the trigger
|
||||
String resumeTriggerCommand = "{" +
|
||||
"'resume-trigger' : {" +
|
||||
"'name' : 'index_size_trigger5'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
boolean await = finished.await(60000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
|
||||
// suspend the trigger
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'index_size_trigger5'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
assertEquals(1, listenerEvents.size());
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing5");
|
||||
assertNotNull("'capturing5' events not found", events);
|
||||
assertEquals("events: " + events, 4, events.size());
|
||||
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
|
||||
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
|
||||
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
|
||||
assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(3).stage);
|
||||
// check ops
|
||||
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(2).event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("should contain requestedOps", ops);
|
||||
assertEquals("number of ops: " + ops, 5, ops.size());
|
||||
|
||||
listenerEvents.clear();
|
||||
finished = new CountDownLatch(1);
|
||||
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'index_size_trigger5'," +
|
||||
"'event' : 'indexSize'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'aboveDocs' : 10," +
|
||||
"'maxOps' : 3," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
await = finished.await(60000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
|
||||
// suspend the trigger
|
||||
suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'index_size_trigger5'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
assertEquals(1, listenerEvents.size());
|
||||
events = listenerEvents.get("capturing5");
|
||||
assertNotNull("'capturing5' events not found", events);
|
||||
assertEquals("events: " + events, 4, events.size());
|
||||
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
|
||||
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
|
||||
assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
|
||||
assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(3).stage);
|
||||
// check ops
|
||||
ops = (List<TriggerEvent.Op>) events.get(2).event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("should contain requestedOps", ops);
|
||||
assertEquals("number of ops: " + ops, 3, ops.size());
|
||||
}
|
||||
|
||||
private Map<String, Object> createTriggerProps(long waitForSeconds) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("event", "indexSize");
|
||||
|
|
|
@ -298,6 +298,12 @@ the default value is `MERGESHARDS` (but see the note above).
|
|||
A comma-separated list of collection names that this trigger should monitor. If not
|
||||
specified or empty all collections are monitored.
|
||||
|
||||
`maxOps`::
|
||||
Maximum number of operations requested in a single event. This property limits the speed of
|
||||
changes in a highly dynamic situation, which may lead to more serious threshold violations,
|
||||
but it also limits the maximum load on the cluster that the large number of requested
|
||||
operations may cause. The default value is 10.
|
||||
|
||||
Events generated by this trigger contain additional details about the shards
|
||||
that exceeded thresholds and the types of violations (upper / lower bounds, bytes / docs metrics).
|
||||
|
||||
|
|
Loading…
Reference in New Issue